You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/30 16:36:36 UTC
[2/4] parquet-cpp git commit: PARQUET-818: Refactoring to utilize
common IO, buffer, memory management abstractions and implementations
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
deleted file mode 100644
index 1bb41e3..0000000
--- a/src/parquet/util/input.h
+++ /dev/null
@@ -1,211 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_UTIL_INPUT_H
-#define PARQUET_UTIL_INPUT_H
-
-#include <cstdint>
-#include <cstdio>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class Buffer;
-class OwnedMutableBuffer;
-
-// ----------------------------------------------------------------------
-// Random access input (e.g. file-like)
-
-// Random
-class PARQUET_EXPORT RandomAccessSource {
- public:
- virtual ~RandomAccessSource() {}
-
- virtual void Close() = 0;
- virtual int64_t Tell() const = 0;
- virtual void Seek(int64_t pos) = 0;
- int64_t Size() const;
-
- // Returns actual number of bytes read
- virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
-
- virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
- std::shared_ptr<Buffer> ReadAt(int64_t pos, int64_t nbytes);
-
- protected:
- int64_t size_;
-};
-
-// ----------------------------------------------------------------------
-// Implementations of RandomAccessSource used for testing and internal CLI tools.
-// May not be sufficiently robust for general production use.
-
-class PARQUET_EXPORT LocalFileSource : public RandomAccessSource {
- public:
- explicit LocalFileSource(MemoryAllocator* allocator = default_allocator())
- : file_(nullptr), is_open_(false), allocator_(allocator) {}
-
- virtual ~LocalFileSource();
-
- virtual void Open(const std::string& path);
-
- virtual void Close();
- virtual int64_t Tell() const;
- virtual void Seek(int64_t pos);
-
- // Returns actual number of bytes read
- virtual int64_t Read(int64_t nbytes, uint8_t* out);
-
- virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
-
- bool is_open() const { return is_open_; }
- const std::string& path() const { return path_; }
-
- // Return the integer file descriptor
- int file_descriptor() const;
-
- protected:
- void CloseFile();
- void SeekFile(int64_t pos, int origin = SEEK_SET);
-
- std::string path_;
- FILE* file_;
- bool is_open_;
- MemoryAllocator* allocator_;
-};
-
-class PARQUET_EXPORT MemoryMapSource : public LocalFileSource {
- public:
- explicit MemoryMapSource(MemoryAllocator* allocator = default_allocator())
- : LocalFileSource(allocator), data_(nullptr), pos_(0) {}
-
- virtual ~MemoryMapSource();
-
- virtual void Close();
- virtual void Open(const std::string& path);
-
- virtual int64_t Tell() const;
- virtual void Seek(int64_t pos);
-
- // Copy data from memory map into out (must be already allocated memory)
- // @returns: actual number of bytes read
- virtual int64_t Read(int64_t nbytes, uint8_t* out);
-
- // Return a buffer referencing memory-map (no copy)
- virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
-
- private:
- void CloseFile();
-
- uint8_t* data_;
- int64_t pos_;
-};
-
-// ----------------------------------------------------------------------
-// A file-like object that reads from virtual address space
-
-class PARQUET_EXPORT BufferReader : public RandomAccessSource {
- public:
- explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
- virtual void Close() {}
- virtual int64_t Tell() const;
- virtual void Seek(int64_t pos);
-
- virtual int64_t Read(int64_t nbytes, uint8_t* out);
-
- virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
-
- protected:
- const uint8_t* Head() { return data_ + pos_; }
-
- std::shared_ptr<Buffer> buffer_;
- const uint8_t* data_;
- int64_t pos_;
-};
-
-// ----------------------------------------------------------------------
-// Streaming input interfaces
-
-// Interface for the column reader to get the bytes. The interface is a stream
-// interface, meaning the bytes in order and once a byte is read, it does not
-// need to be read again.
-class InputStream {
- public:
- // Returns the next 'num_to_peek' without advancing the current position.
- // *num_bytes will contain the number of bytes returned which can only be
- // less than num_to_peek at end of stream cases.
- // Since the position is not advanced, calls to this function are idempotent.
- // The buffer returned to the caller is still owned by the input stream and must
- // stay valid until the next call to Peek() or Read().
- virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
-
- // Identical to Peek(), except the current position in the stream is advanced by
- // *num_bytes.
- virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
-
- // Advance the stream without reading
- virtual void Advance(int64_t num_bytes) = 0;
-
- virtual ~InputStream() {}
-
- protected:
- InputStream() {}
-};
-
-// Implementation of an InputStream when all the bytes are in memory.
-class InMemoryInputStream : public InputStream {
- public:
- InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
- explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
- virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
- virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
-
- virtual void Advance(int64_t num_bytes);
-
- private:
- std::shared_ptr<Buffer> buffer_;
- int64_t len_;
- int64_t offset_;
-};
-
-// Implementation of an InputStream when only some of the bytes are in memory.
-class BufferedInputStream : public InputStream {
- public:
- BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
- RandomAccessSource* source, int64_t start, int64_t end);
- virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
- virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
-
- virtual void Advance(int64_t num_bytes);
-
- private:
- std::shared_ptr<OwnedMutableBuffer> buffer_;
- RandomAccessSource* source_;
- int64_t stream_offset_;
- int64_t stream_end_;
- int64_t buffer_offset_;
- int64_t buffer_size_;
-};
-
-} // namespace parquet
-
-#endif // PARQUET_UTIL_INPUT_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator-test.cc b/src/parquet/util/mem-allocator-test.cc
deleted file mode 100644
index 336d3b4..0000000
--- a/src/parquet/util/mem-allocator-test.cc
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include "parquet/exception.h"
-#include "parquet/util/mem-allocator.h"
-
-namespace parquet {
-
-TEST(TestAllocator, AllocateFree) {
- TrackingAllocator allocator;
-
- uint8_t* data = allocator.Malloc(100);
- ASSERT_TRUE(nullptr != data);
- data[99] = 55;
- allocator.Free(data, 100);
-
- data = allocator.Malloc(0);
- ASSERT_EQ(nullptr, data);
- allocator.Free(data, 0);
-
- data = allocator.Malloc(1);
- ASSERT_THROW(allocator.Free(data, 2), ParquetException);
- ASSERT_NO_THROW(allocator.Free(data, 1));
-
- int64_t to_alloc = std::numeric_limits<int64_t>::max();
- ASSERT_THROW(allocator.Malloc(to_alloc), ParquetException);
-}
-
-TEST(TestAllocator, TotalMax) {
- TrackingAllocator allocator;
- ASSERT_EQ(0, allocator.TotalMemory());
- ASSERT_EQ(0, allocator.MaxMemory());
-
- uint8_t* data = allocator.Malloc(100);
- ASSERT_EQ(100, allocator.TotalMemory());
- ASSERT_EQ(100, allocator.MaxMemory());
-
- uint8_t* data2 = allocator.Malloc(10);
- ASSERT_EQ(110, allocator.TotalMemory());
- ASSERT_EQ(110, allocator.MaxMemory());
-
- allocator.Free(data, 100);
- ASSERT_EQ(10, allocator.TotalMemory());
- ASSERT_EQ(110, allocator.MaxMemory());
-
- allocator.Free(data2, 10);
- ASSERT_EQ(0, allocator.TotalMemory());
- ASSERT_EQ(110, allocator.MaxMemory());
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.cc b/src/parquet/util/mem-allocator.cc
deleted file mode 100644
index 2b6592d..0000000
--- a/src/parquet/util/mem-allocator.cc
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/mem-allocator.h"
-
-#include <cstdlib>
-
-#include "parquet/exception.h"
-
-namespace parquet {
-
-MemoryAllocator::~MemoryAllocator() {}
-
-uint8_t* TrackingAllocator::Malloc(int64_t size) {
- if (0 == size) { return nullptr; }
-
- uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
- if (!p) { throw ParquetException("OOM: memory allocation failed"); }
- {
- std::lock_guard<std::mutex> lock(stats_mutex_);
- total_memory_ += size;
- if (total_memory_ > max_memory_) { max_memory_ = total_memory_; }
- }
- return p;
-}
-
-void TrackingAllocator::Free(uint8_t* p, int64_t size) {
- if (nullptr != p && size > 0) {
- {
- std::lock_guard<std::mutex> lock(stats_mutex_);
- if (total_memory_ < size) {
- throw ParquetException("Attempting to free too much memory");
- }
- total_memory_ -= size;
- }
- std::free(p);
- }
-}
-
-TrackingAllocator::~TrackingAllocator() {}
-
-MemoryAllocator* default_allocator() {
- static TrackingAllocator default_allocator;
- return &default_allocator;
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.h b/src/parquet/util/mem-allocator.h
deleted file mode 100644
index a0f3693..0000000
--- a/src/parquet/util/mem-allocator.h
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_UTIL_MEMORY_POOL_H
-#define PARQUET_UTIL_MEMORY_POOL_H
-
-#include <cstdint>
-#include <mutex>
-
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class PARQUET_EXPORT MemoryAllocator {
- public:
- virtual ~MemoryAllocator();
-
- // Returns nullptr if size is 0
- virtual uint8_t* Malloc(int64_t size) = 0;
- virtual void Free(uint8_t* p, int64_t size) = 0;
-};
-
-PARQUET_EXPORT MemoryAllocator* default_allocator();
-
-class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
- public:
- TrackingAllocator() : total_memory_(0), max_memory_(0) {}
- virtual ~TrackingAllocator();
-
- uint8_t* Malloc(int64_t size) override;
- void Free(uint8_t* p, int64_t size) override;
-
- int64_t TotalMemory() { return total_memory_; }
-
- int64_t MaxMemory() { return max_memory_; }
-
- private:
- std::mutex stats_mutex_;
- int64_t total_memory_;
- int64_t max_memory_;
-};
-
-} // namespace parquet
-
-#endif // PARQUET_UTIL_MEMORY_POOL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool-test.cc b/src/parquet/util/mem-pool-test.cc
deleted file mode 100644
index 3f3424b..0000000
--- a/src/parquet/util/mem-pool-test.cc
+++ /dev/null
@@ -1,247 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Initially imported from Apache Impala on 2016-02-23, and has been modified
-// since for parquet-cpp
-
-#include <cstdint>
-#include <gtest/gtest.h>
-#include <limits>
-#include <string>
-
-#include "parquet/util/bit-util.h"
-#include "parquet/util/mem-pool.h"
-
-namespace parquet {
-
-// Utility class to call private functions on MemPool.
-class MemPoolTest {
- public:
- static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) {
- return pool->CheckIntegrity(current_chunk_empty);
- }
-
- static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE;
- static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE;
-};
-
-const int MemPoolTest::INITIAL_CHUNK_SIZE;
-const int MemPoolTest::MAX_CHUNK_SIZE;
-
-TEST(MemPoolTest, Basic) {
- MemPool p;
- MemPool p2;
- MemPool p3;
-
- for (int iter = 0; iter < 2; ++iter) {
- // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
- for (int i = 0; i < 768; ++i) {
- // pads to 32 bytes
- p.Allocate(25);
- }
- // we handed back 24K
- EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
- // .. and allocated 28K of chunks (4, 8, 16)
- EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
-
- // we're passing on the first two chunks, containing 12K of data; we're left with
- // one chunk of 16K containing 12K of data
- p2.AcquireData(&p, true);
- EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
- EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
-
- // we allocate 8K, for which there isn't enough room in the current chunk,
- // so another one is allocated (32K)
- p.Allocate(8 * 1024);
- EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
-
- // we allocate 65K, which doesn't fit into the current chunk or the default
- // size of the next allocated chunk (64K)
- p.Allocate(65 * 1024);
- EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // Clear() resets allocated data, but doesn't remove any chunks
- p.Clear();
- EXPECT_EQ(0, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // next allocation reuses existing chunks
- p.Allocate(1024);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // ... unless it doesn't fit into any available chunk
- p.Allocate(120 * 1024);
- EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
- if (iter == 0) {
- EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
- } else {
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- }
- EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // ... Try another chunk that fits into an existing chunk
- p.Allocate(33 * 1024);
- EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
- EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
- // we're releasing 3 chunks, which get added to p2
- p2.AcquireData(&p, false);
- EXPECT_EQ(0, p.total_allocated_bytes());
- EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
- EXPECT_EQ(0, p.GetTotalChunkSizes());
-
- p3.AcquireData(&p2, true); // we're keeping the 65k chunk
- EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
- EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
-
- p.FreeAll();
- p2.FreeAll();
- p3.FreeAll();
- }
-}
-
-// Test that we can keep an allocated chunk and a free chunk.
-// This case verifies that when chunks are acquired by another memory pool the
-// remaining chunks are consistent if there were more than one used chunk and some
-// free chunks.
-TEST(MemPoolTest, Keep) {
- MemPool p;
- p.Allocate(4 * 1024);
- p.Allocate(8 * 1024);
- p.Allocate(16 * 1024);
- EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
- EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
- p.Clear();
- EXPECT_EQ(0, p.total_allocated_bytes());
- EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
- p.Allocate(1 * 1024);
- p.Allocate(4 * 1024);
- EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
- EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
-
- MemPool p2;
- p2.AcquireData(&p, true);
- EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
- EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
- EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
- EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
-
- p.FreeAll();
- p2.FreeAll();
-}
-
-// Tests that we can return partial allocations.
-TEST(MemPoolTest, ReturnPartial) {
- MemPool p;
- uint8_t* ptr = p.Allocate(1024);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- memset(ptr, 0, 1024);
- p.ReturnPartialAllocation(1024);
-
- uint8_t* ptr2 = p.Allocate(1024);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- EXPECT_TRUE(ptr == ptr2);
- p.ReturnPartialAllocation(1016);
-
- ptr2 = p.Allocate(1016);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- EXPECT_TRUE(ptr2 == ptr + 8);
- p.ReturnPartialAllocation(512);
- memset(ptr2, 1, 1016 - 512);
-
- uint8_t* ptr3 = p.Allocate(512);
- EXPECT_EQ(1024, p.total_allocated_bytes());
- EXPECT_TRUE(ptr3 == ptr + 512);
- memset(ptr3, 2, 512);
-
- for (int i = 0; i < 8; ++i) {
- EXPECT_EQ(0, ptr[i]);
- }
- for (int i = 8; i < 512; ++i) {
- EXPECT_EQ(1, ptr[i]);
- }
- for (int i = 512; i < 1024; ++i) {
- EXPECT_EQ(2, ptr[i]);
- }
-
- p.FreeAll();
-}
-
-// Test that the MemPool overhead is bounded when we make allocations of
-// INITIAL_CHUNK_SIZE.
-TEST(MemPoolTest, MemoryOverhead) {
- MemPool p;
- const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE;
- const int num_allocs = 1000;
- int64_t total_allocated = 0;
-
- for (int i = 0; i < num_allocs; ++i) {
- uint8_t* mem = p.Allocate(alloc_size);
- ASSERT_TRUE(mem != NULL);
- total_allocated += alloc_size;
-
- int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
- // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
- // one empty chunk at the end.
- EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE);
- // The chunk doubling algorithm should not allocate chunks larger than the total
- // amount of memory already allocated.
- EXPECT_LE(wasted_memory, total_allocated);
- }
-
- p.FreeAll();
-}
-
-// Test that the MemPool overhead is bounded when we make alternating large and small
-// allocations.
-TEST(MemPoolTest, FragmentationOverhead) {
- MemPool p;
- const int num_allocs = 100;
- int64_t total_allocated = 0;
-
- for (int i = 0; i < num_allocs; ++i) {
- int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE;
- uint8_t* mem = p.Allocate(alloc_size);
- ASSERT_TRUE(mem != NULL);
- total_allocated += alloc_size;
-
- int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
- // Fragmentation should not waste more than half of each completed chunk.
- EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE);
- }
-
- p.FreeAll();
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc
deleted file mode 100644
index 1ab40bc..0000000
--- a/src/parquet/util/mem-pool.cc
+++ /dev/null
@@ -1,264 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Initially imported from Apache Impala on 2016-02-23, and has been modified
-// since for parquet-cpp
-
-#include "parquet/util/mem-pool.h"
-
-#include <stdio.h>
-
-#include <algorithm>
-#include <cstdint>
-#include <sstream>
-#include <string>
-
-#include "parquet/util/bit-util.h"
-#include "parquet/util/logging.h"
-
-namespace parquet {
-
-const int MemPool::INITIAL_CHUNK_SIZE;
-const int MemPool::MAX_CHUNK_SIZE;
-
-MemPool::MemPool(MemoryAllocator* allocator)
- : current_chunk_idx_(-1),
- next_chunk_size_(INITIAL_CHUNK_SIZE),
- total_allocated_bytes_(0),
- peak_allocated_bytes_(0),
- total_reserved_bytes_(0),
- allocator_(allocator) {}
-
-MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
- : data(buf), size(size), allocated_bytes(0) {}
-
-MemPool::~MemPool() {
- int64_t total_bytes_released = 0;
- for (size_t i = 0; i < chunks_.size(); ++i) {
- total_bytes_released += chunks_[i].size;
- allocator_->Free(chunks_[i].data, chunks_[i].size);
- }
-
- DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
-}
-
-void MemPool::ReturnPartialAllocation(int byte_size) {
- DCHECK_GE(byte_size, 0);
- DCHECK(current_chunk_idx_ != -1);
- ChunkInfo& info = chunks_[current_chunk_idx_];
- DCHECK_GE(info.allocated_bytes, byte_size);
- info.allocated_bytes -= byte_size;
- total_allocated_bytes_ -= byte_size;
-}
-
-template <bool CHECK_LIMIT_FIRST>
-uint8_t* MemPool::Allocate(int size) {
- if (size == 0) return NULL;
-
- int64_t num_bytes = BitUtil::RoundUp(size, 8);
- if (current_chunk_idx_ == -1 ||
- num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
- chunks_[current_chunk_idx_].size) {
- // If we couldn't allocate a new chunk, return NULL.
- if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
- }
- ChunkInfo& info = chunks_[current_chunk_idx_];
- uint8_t* result = info.data + info.allocated_bytes;
- DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
- info.allocated_bytes += num_bytes;
- total_allocated_bytes_ += num_bytes;
- DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
- peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
- return result;
-}
-
-uint8_t* MemPool::Allocate(int size) {
- return Allocate<false>(size);
-}
-
-void MemPool::Clear() {
- current_chunk_idx_ = -1;
- for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {
- chunk->allocated_bytes = 0;
- }
- total_allocated_bytes_ = 0;
- DCHECK(CheckIntegrity(false));
-}
-
-void MemPool::FreeAll() {
- int64_t total_bytes_released = 0;
- for (size_t i = 0; i < chunks_.size(); ++i) {
- total_bytes_released += chunks_[i].size;
- allocator_->Free(chunks_[i].data, chunks_[i].size);
- }
- chunks_.clear();
- next_chunk_size_ = INITIAL_CHUNK_SIZE;
- current_chunk_idx_ = -1;
- total_allocated_bytes_ = 0;
- total_reserved_bytes_ = 0;
-}
-
-bool MemPool::FindChunk(int64_t min_size) {
- // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
- // after the current chunk.
- int first_free_idx = current_chunk_idx_ + 1;
- // (cast size() to signed int in order to avoid everything else being cast to
- // unsigned long, in particular -1)
- while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
- // we found a free chunk
- DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
-
- if (chunks_[current_chunk_idx_].size >= min_size) {
- // This chunk is big enough. Move it before the other free chunks.
- if (current_chunk_idx_ != first_free_idx) {
- std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
- current_chunk_idx_ = first_free_idx;
- }
- break;
- }
- }
-
- if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
- // need to allocate new chunk.
- int64_t chunk_size;
- DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
- DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
-
- chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
-
- // Allocate a new chunk. Return early if malloc fails.
- uint8_t* buf = allocator_->Malloc(chunk_size);
- if (UNLIKELY(buf == NULL)) {
- DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
- current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
- return false;
- }
-
- // If there are no free chunks put it at the end, otherwise before the first free.
- if (first_free_idx == static_cast<int>(chunks_.size())) {
- chunks_.push_back(ChunkInfo(chunk_size, buf));
- } else {
- current_chunk_idx_ = first_free_idx;
- auto insert_chunk = chunks_.begin() + current_chunk_idx_;
- chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
- }
- total_reserved_bytes_ += chunk_size;
- // Don't increment the chunk size until the allocation succeeds: if an attempted
- // large allocation fails we don't want to increase the chunk size further.
- next_chunk_size_ =
- static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
- }
-
- DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
- DCHECK(CheckIntegrity(true));
- return true;
-}
-
-void MemPool::AcquireData(MemPool* src, bool keep_current) {
- DCHECK(src->CheckIntegrity(false));
- int num_acquired_chunks;
- if (keep_current) {
- num_acquired_chunks = src->current_chunk_idx_;
- } else if (src->GetFreeOffset() == 0) {
- // nothing in the last chunk
- num_acquired_chunks = src->current_chunk_idx_;
- } else {
- num_acquired_chunks = src->current_chunk_idx_ + 1;
- }
-
- if (num_acquired_chunks <= 0) {
- if (!keep_current) src->FreeAll();
- return;
- }
-
- auto end_chunk = src->chunks_.begin() + num_acquired_chunks;
- int64_t total_transfered_bytes = 0;
- for (auto i = src->chunks_.begin(); i != end_chunk; ++i) {
- total_transfered_bytes += i->size;
- }
- src->total_reserved_bytes_ -= total_transfered_bytes;
- total_reserved_bytes_ += total_transfered_bytes;
-
- // insert new chunks after current_chunk_idx_
- auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1;
- chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
- src->chunks_.erase(src->chunks_.begin(), end_chunk);
- current_chunk_idx_ += num_acquired_chunks;
-
- if (keep_current) {
- src->current_chunk_idx_ = 0;
- DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
- total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset();
- src->total_allocated_bytes_ = src->GetFreeOffset();
- } else {
- src->current_chunk_idx_ = -1;
- total_allocated_bytes_ += src->total_allocated_bytes_;
- src->total_allocated_bytes_ = 0;
- }
- peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
-
- if (!keep_current) src->FreeAll();
- DCHECK(CheckIntegrity(false));
-}
-
-std::string MemPool::DebugString() {
- std::stringstream out;
- char str[16];
- out << "MemPool(#chunks=" << chunks_.size() << " [";
- for (size_t i = 0; i < chunks_.size(); ++i) {
- sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data)); // NOLINT
- out << (i > 0 ? " " : "") << str << chunks_[i].size << "/"
- << chunks_[i].allocated_bytes;
- }
- out << "] current_chunk=" << current_chunk_idx_
- << " total_sizes=" << GetTotalChunkSizes()
- << " total_alloc=" << total_allocated_bytes_ << ")";
- return out.str();
-}
-
-int64_t MemPool::GetTotalChunkSizes() const {
- int64_t result = 0;
- for (size_t i = 0; i < chunks_.size(); ++i) {
- result += chunks_[i].size;
- }
- return result;
-}
-
-bool MemPool::CheckIntegrity(bool current_chunk_empty) {
- // check that current_chunk_idx_ points to the last chunk with allocated data
- DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
- int64_t total_allocated = 0;
- for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) {
- DCHECK_GT(chunks_[i].size, 0);
- if (i < current_chunk_idx_) {
- DCHECK_GT(chunks_[i].allocated_bytes, 0);
- } else if (i == current_chunk_idx_) {
- if (current_chunk_empty) {
- DCHECK_EQ(chunks_[i].allocated_bytes, 0);
- } else {
- DCHECK_GT(chunks_[i].allocated_bytes, 0);
- }
- } else {
- DCHECK_EQ(chunks_[i].allocated_bytes, 0);
- }
- total_allocated += chunks_[i].allocated_bytes;
- }
- DCHECK_EQ(total_allocated, total_allocated_bytes_);
- return true;
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h
deleted file mode 100644
index 5f6afa9..0000000
--- a/src/parquet/util/mem-pool.h
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Initially imported from Apache Impala on 2016-02-23, and has been modified
-// since for parquet-cpp
-
-#ifndef PARQUET_UTIL_MEM_POOL_H
-#define PARQUET_UTIL_MEM_POOL_H
-
-#include <algorithm>
-#include <cstdint>
-#include <stdio.h>
-#include <string>
-#include <vector>
-
-#include "parquet/util/mem-allocator.h"
-
-namespace parquet {
-
-/// A MemPool maintains a list of memory chunks from which it allocates memory
-/// in response to Allocate() calls;
-/// Chunks stay around for the lifetime of the mempool or until they are passed on to
-/// another mempool.
-//
-/// An Allocate() call will attempt to allocate memory from the chunk that was most
-/// recently added; if that chunk doesn't have enough memory to
-/// satisfy the allocation request, the free chunks are searched for one that is
-/// big enough otherwise a new chunk is added to the list.
-/// The current_chunk_idx_ always points to the last chunk with allocated memory.
-/// In order to keep allocation overhead low, chunk sizes double with each new one
-/// added, until they hit a maximum size.
-//
-/// Example:
-/// MemPool* p = new MemPool();
-/// for (int i = 0; i < 1024; ++i) {
-/// returns 8-byte aligned memory (effectively 24 bytes):
-/// .. = p->Allocate(17);
-/// }
-/// at this point, 17K have been handed out in response to Allocate() calls and
-/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
-/// We track total and peak allocated bytes. At this point they would be the same:
-/// 28k bytes. A call to Clear will return the allocated memory so
-/// total_allocate_bytes_
-/// becomes 0 while peak_allocate_bytes_ remains at 28k.
-/// p->Clear();
-/// the entire 1st chunk is returned:
-/// .. = p->Allocate(4 * 1024);
-/// 4K of the 2nd chunk are returned:
-/// .. = p->Allocate(4 * 1024);
-/// a new 20K chunk is created
-/// .. = p->Allocate(20 * 1024);
-//
-/// MemPool* p2 = new MemPool();
-/// the new mempool receives all chunks containing data from p
-/// p2->AcquireData(p, false);
-/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
-/// remains unchanged.
-/// The one remaining (empty) chunk is released:
-/// delete p;
-
-class MemPool {
- public:
- explicit MemPool(MemoryAllocator* allocator = default_allocator());
-
- /// Frees all chunks of memory and subtracts the total allocated bytes
- /// from the registered limits.
- ~MemPool();
-
- /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
- /// of the the current chunk. Creates a new chunk if there aren't any chunks
- /// with enough capacity.
- uint8_t* Allocate(int size);
-
- /// Returns 'byte_size' to the current chunk back to the mem pool. This can
- /// only be used to return either all or part of the previous allocation returned
- /// by Allocate().
- void ReturnPartialAllocation(int byte_size);
-
- /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
- void Clear();
-
- /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
- /// each mem pool
- void FreeAll();
-
- /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
- /// to its last allocated chunk that contains data.
- /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
- void AcquireData(MemPool* src, bool keep_current);
-
- std::string DebugString();
-
- int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
- int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
- int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
-
- /// Return sum of chunk_sizes_.
- int64_t GetTotalChunkSizes() const;
-
- private:
- friend class MemPoolTest;
- static const int INITIAL_CHUNK_SIZE = 4 * 1024;
-
- /// The maximum size of chunk that should be allocated. Allocations larger than this
- /// size will get their own individual chunk.
- static const int MAX_CHUNK_SIZE = 1024 * 1024;
-
- struct ChunkInfo {
- uint8_t* data; // Owned by the ChunkInfo.
- int64_t size; // in bytes
-
- /// bytes allocated via Allocate() in this chunk
- int64_t allocated_bytes;
-
- explicit ChunkInfo(int64_t size, uint8_t* buf);
-
- ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
- };
-
- /// chunk from which we served the last Allocate() call;
- /// always points to the last chunk that contains allocated data;
- /// chunks 0..current_chunk_idx_ are guaranteed to contain data
- /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
- /// -1 if no chunks present
- int current_chunk_idx_;
-
- /// The size of the next chunk to allocate.
- int64_t next_chunk_size_;
-
- /// sum of allocated_bytes_
- int64_t total_allocated_bytes_;
-
- /// Maximum number of bytes allocated from this pool at one time.
- int64_t peak_allocated_bytes_;
-
- /// sum of all bytes allocated in chunks_
- int64_t total_reserved_bytes_;
-
- std::vector<ChunkInfo> chunks_;
-
- MemoryAllocator* allocator_;
-
- /// Find or allocated a chunk with at least min_size spare capacity and update
- /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
- /// if a new chunk needs to be created.
- bool FindChunk(int64_t min_size);
-
- /// Check integrity of the supporting data structures; always returns true but DCHECKs
- /// all invariants.
- /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
- bool CheckIntegrity(bool current_chunk_empty);
-
- /// Return offset to unoccpied space in current chunk.
- int GetFreeOffset() const {
- if (current_chunk_idx_ == -1) return 0;
- return chunks_[current_chunk_idx_].allocated_bytes;
- }
-
- template <bool CHECK_LIMIT_FIRST>
- uint8_t* Allocate(int size);
-};
-
-} // namespace parquet
-
-#endif // PARQUET_UTIL_MEM_POOL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
new file mode 100644
index 0000000..45aa819
--- /dev/null
+++ b/src/parquet/util/memory-test.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "parquet/exception.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet {
+
+class TestBuffer : public ::testing::Test {};
+
+TEST(TestAllocator, AllocateFree) {
+ TrackingAllocator allocator;
+
+ uint8_t* data;
+
+ ASSERT_TRUE(allocator.Allocate(100, &data).ok());
+ ASSERT_TRUE(nullptr != data);
+ data[99] = 55;
+ allocator.Free(data, 100);
+
+ ASSERT_TRUE(allocator.Allocate(0, &data).ok());
+ ASSERT_EQ(nullptr, data);
+ allocator.Free(data, 0);
+
+ int64_t to_alloc = std::numeric_limits<int64_t>::max();
+ ASSERT_FALSE(allocator.Allocate(to_alloc, &data).ok());
+}
+
+TEST(TestAllocator, TotalMax) {
+ TrackingAllocator allocator;
+ ASSERT_EQ(0, allocator.bytes_allocated());
+ ASSERT_EQ(0, allocator.max_memory());
+
+ uint8_t* data;
+ uint8_t* data2;
+ ASSERT_TRUE(allocator.Allocate(100, &data).ok());
+ ASSERT_EQ(100, allocator.bytes_allocated());
+ ASSERT_EQ(100, allocator.max_memory());
+
+ ASSERT_TRUE(allocator.Allocate(10, &data2).ok());
+ ASSERT_EQ(110, allocator.bytes_allocated());
+ ASSERT_EQ(110, allocator.max_memory());
+
+ allocator.Free(data, 100);
+ ASSERT_EQ(10, allocator.bytes_allocated());
+ ASSERT_EQ(110, allocator.max_memory());
+
+ allocator.Free(data2, 10);
+ ASSERT_EQ(0, allocator.bytes_allocated());
+ ASSERT_EQ(110, allocator.max_memory());
+}
+
+// Utility class to call private functions on MemPool.
+class ChunkedAllocatorTest {
+ public:
+ static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) {
+ return pool->CheckIntegrity(current_chunk_empty);
+ }
+
+ static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE;
+ static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE;
+};
+
+const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
+const int ChunkedAllocatorTest::MAX_CHUNK_SIZE;
+
+TEST(ChunkedAllocatorTest, Basic) {
+ ChunkedAllocator p;
+ ChunkedAllocator p2;
+ ChunkedAllocator p3;
+
+ for (int iter = 0; iter < 2; ++iter) {
+ // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
+ for (int i = 0; i < 768; ++i) {
+ // pads to 32 bytes
+ p.Allocate(25);
+ }
+ // we handed back 24K
+ EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
+ // .. and allocated 28K of chunks (4, 8, 16)
+ EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
+
+ // we're passing on the first two chunks, containing 12K of data; we're left with
+ // one chunk of 16K containing 12K of data
+ p2.AcquireData(&p, true);
+ EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
+ EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
+
+ // we allocate 8K, for which there isn't enough room in the current chunk,
+ // so another one is allocated (32K)
+ p.Allocate(8 * 1024);
+ EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
+
+ // we allocate 65K, which doesn't fit into the current chunk or the default
+ // size of the next allocated chunk (64K)
+ p.Allocate(65 * 1024);
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // Clear() resets allocated data, but doesn't remove any chunks
+ p.Clear();
+ EXPECT_EQ(0, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // next allocation reuses existing chunks
+ p.Allocate(1024);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // ... unless it doesn't fit into any available chunk
+ p.Allocate(120 * 1024);
+ EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
+ if (iter == 0) {
+ EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
+ } else {
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ }
+ EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // ... Try another chunk that fits into an existing chunk
+ p.Allocate(33 * 1024);
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+ // we're releasing 3 chunks, which get added to p2
+ p2.AcquireData(&p, false);
+ EXPECT_EQ(0, p.total_allocated_bytes());
+ EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+ EXPECT_EQ(0, p.GetTotalChunkSizes());
+
+ p3.AcquireData(&p2, true); // we're keeping the 65k chunk
+ EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
+ EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
+
+ p.FreeAll();
+ p2.FreeAll();
+ p3.FreeAll();
+ }
+}
+
+// Test that we can keep an allocated chunk and a free chunk.
+// This case verifies that when chunks are acquired by another memory pool the
+// remaining chunks are consistent if there were more than one used chunk and some
+// free chunks.
+TEST(ChunkedAllocatorTest, Keep) {
+ ChunkedAllocator p;
+ p.Allocate(4 * 1024);
+ p.Allocate(8 * 1024);
+ p.Allocate(16 * 1024);
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+ p.Clear();
+ EXPECT_EQ(0, p.total_allocated_bytes());
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+ p.Allocate(1 * 1024);
+ p.Allocate(4 * 1024);
+ EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+
+ ChunkedAllocator p2;
+ p2.AcquireData(&p, true);
+ EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
+ EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
+ EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
+ EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
+
+ p.FreeAll();
+ p2.FreeAll();
+}
+
+// Tests that we can return partial allocations.
+TEST(ChunkedAllocatorTest, ReturnPartial) {
+ ChunkedAllocator p;
+ uint8_t* ptr = p.Allocate(1024);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ memset(ptr, 0, 1024);
+ p.ReturnPartialAllocation(1024);
+
+ uint8_t* ptr2 = p.Allocate(1024);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ EXPECT_TRUE(ptr == ptr2);
+ p.ReturnPartialAllocation(1016);
+
+ ptr2 = p.Allocate(1016);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ EXPECT_TRUE(ptr2 == ptr + 8);
+ p.ReturnPartialAllocation(512);
+ memset(ptr2, 1, 1016 - 512);
+
+ uint8_t* ptr3 = p.Allocate(512);
+ EXPECT_EQ(1024, p.total_allocated_bytes());
+ EXPECT_TRUE(ptr3 == ptr + 512);
+ memset(ptr3, 2, 512);
+
+ for (int i = 0; i < 8; ++i) {
+ EXPECT_EQ(0, ptr[i]);
+ }
+ for (int i = 8; i < 512; ++i) {
+ EXPECT_EQ(1, ptr[i]);
+ }
+ for (int i = 512; i < 1024; ++i) {
+ EXPECT_EQ(2, ptr[i]);
+ }
+
+ p.FreeAll();
+}
+
+// Test that the ChunkedAllocator overhead is bounded when we make allocations of
+// INITIAL_CHUNK_SIZE.
+TEST(ChunkedAllocatorTest, MemoryOverhead) {
+ ChunkedAllocator p;
+ const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
+ const int num_allocs = 1000;
+ int64_t total_allocated = 0;
+
+ for (int i = 0; i < num_allocs; ++i) {
+ uint8_t* mem = p.Allocate(alloc_size);
+ ASSERT_TRUE(mem != NULL);
+ total_allocated += alloc_size;
+
+ int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+ // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
+ // one empty chunk at the end.
+ EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE);
+ // The chunk doubling algorithm should not allocate chunks larger than the total
+ // amount of memory already allocated.
+ EXPECT_LE(wasted_memory, total_allocated);
+ }
+
+ p.FreeAll();
+}
+
+// Test that the ChunkedAllocator overhead is bounded when we make alternating
+// large and small allocations.
+TEST(ChunkedAllocatorTest, FragmentationOverhead) {
+ ChunkedAllocator p;
+ const int num_allocs = 100;
+ int64_t total_allocated = 0;
+
+ for (int i = 0; i < num_allocs; ++i) {
+ int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
+ uint8_t* mem = p.Allocate(alloc_size);
+ ASSERT_TRUE(mem != NULL);
+ total_allocated += alloc_size;
+
+ int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+ // Fragmentation should not waste more than half of each completed chunk.
+ EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE);
+ }
+
+ p.FreeAll();
+}
+
+TEST(TestBufferedInputStream, Basics) {
+ int64_t source_size = 256;
+ int64_t stream_offset = 10;
+ int64_t stream_size = source_size - stream_offset;
+ int64_t chunk_size = 50;
+ std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_allocator(), source_size);
+ ASSERT_EQ(source_size, buf->size());
+ for (int i = 0; i < source_size; i++) {
+ buf->mutable_data()[i] = i;
+ }
+
+ auto wrapper =
+ std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf));
+
+ TrackingAllocator allocator;
+ std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
+ &allocator, chunk_size, wrapper.get(), stream_offset, stream_size));
+
+ const uint8_t* output;
+ int64_t bytes_read;
+
+ // source is at offset 10
+ output = stream->Peek(10, &bytes_read);
+ ASSERT_EQ(10, bytes_read);
+ for (int i = 0; i < 10; i++) {
+ ASSERT_EQ(10 + i, output[i]) << i;
+ }
+ output = stream->Read(10, &bytes_read);
+ ASSERT_EQ(10, bytes_read);
+ for (int i = 0; i < 10; i++) {
+ ASSERT_EQ(10 + i, output[i]) << i;
+ }
+ output = stream->Read(10, &bytes_read);
+ ASSERT_EQ(10, bytes_read);
+ for (int i = 0; i < 10; i++) {
+ ASSERT_EQ(20 + i, output[i]) << i;
+ }
+ stream->Advance(5);
+ stream->Advance(5);
+ // source is at offset 40
+ // read across buffer boundary. buffer size is 50
+ output = stream->Read(20, &bytes_read);
+ ASSERT_EQ(20, bytes_read);
+ for (int i = 0; i < 20; i++) {
+ ASSERT_EQ(40 + i, output[i]) << i;
+ }
+ // read more than original chunk_size
+ output = stream->Read(60, &bytes_read);
+ ASSERT_EQ(60, bytes_read);
+ for (int i = 0; i < 60; i++) {
+ ASSERT_EQ(60 + i, output[i]) << i;
+ }
+
+ stream->Advance(120);
+ // source is at offset 240
+ // read outside of source boundary. source size is 256
+ output = stream->Read(30, &bytes_read);
+ ASSERT_EQ(16, bytes_read);
+ for (int i = 0; i < 16; i++) {
+ ASSERT_EQ(240 + i, output[i]) << i;
+ }
+}
+
+TEST(TestArrowInputFile, Basics) {
+ std::string data = "this is the data";
+ auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
+
+ auto file = std::make_shared<::arrow::io::BufferReader>(data_buffer, data.size());
+ auto source = std::make_shared<ArrowInputFile>(file);
+
+ ASSERT_EQ(0, source->Tell());
+ ASSERT_NO_THROW(source->Seek(5));
+ ASSERT_EQ(5, source->Tell());
+ ASSERT_NO_THROW(source->Seek(0));
+
+ // Seek out of bounds
+ ASSERT_THROW(source->Seek(100), ParquetException);
+
+ uint8_t buffer[50];
+
+ ASSERT_NO_THROW(source->Read(4, buffer));
+ ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+ ASSERT_EQ(4, source->Tell());
+
+ std::shared_ptr<Buffer> pq_buffer;
+
+ ASSERT_NO_THROW(pq_buffer = source->Read(7));
+
+ auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
+
+ ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
+}
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
new file mode 100644
index 0000000..9ad0336
--- /dev/null
+++ b/src/parquet/util/memory.cc
@@ -0,0 +1,543 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/memory.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <string>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+#include "parquet/util/bit-util.h"
+#include "parquet/util/logging.h"
+
+namespace parquet {
+
+::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) {
+ if (0 == size) {
+ *out = nullptr;
+ return ::arrow::Status::OK();
+ }
+
+ uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
+ if (!p) { return ::arrow::Status::OutOfMemory("memory allocation failed"); }
+ {
+ std::lock_guard<std::mutex> lock(stats_mutex_);
+ total_memory_ += size;
+ if (total_memory_ > max_memory_) { max_memory_ = total_memory_; }
+ }
+ *out = p;
+ return ::arrow::Status::OK();
+}
+
+void TrackingAllocator::Free(uint8_t* p, int64_t size) {
+ if (nullptr != p && size > 0) {
+ {
+ std::lock_guard<std::mutex> lock(stats_mutex_);
+ DCHECK_GE(total_memory_, size) << "Attempting to free too much memory";
+ total_memory_ -= size;
+ }
+ std::free(p);
+ }
+}
+
+MemoryAllocator* default_allocator() {
+ static TrackingAllocator allocator;
+ return &allocator;
+}
+
+template <class T>
+Vector<T>::Vector(int64_t size, MemoryAllocator* allocator)
+ : buffer_(AllocateUniqueBuffer(allocator, size * sizeof(T))),
+ size_(size),
+ capacity_(size) {
+ if (size > 0) {
+ data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+ } else {
+ data_ = nullptr;
+ }
+}
+
+template <class T>
+void Vector<T>::Reserve(int64_t new_capacity) {
+ if (new_capacity > capacity_) {
+ PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity * sizeof(T)));
+ data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+ capacity_ = new_capacity;
+ }
+}
+
+template <class T>
+void Vector<T>::Resize(int64_t new_size) {
+ Reserve(new_size);
+ size_ = new_size;
+}
+
+template <class T>
+void Vector<T>::Assign(int64_t size, const T val) {
+ Resize(size);
+ for (int64_t i = 0; i < size_; i++) {
+ data_[i] = val;
+ }
+}
+
+template <class T>
+void Vector<T>::Swap(Vector<T>& v) {
+ buffer_.swap(v.buffer_);
+ std::swap(size_, v.size_);
+ std::swap(capacity_, v.capacity_);
+ std::swap(data_, v.data_);
+}
+
+template class Vector<int32_t>;
+template class Vector<int64_t>;
+template class Vector<bool>;
+template class Vector<float>;
+template class Vector<double>;
+template class Vector<Int96>;
+template class Vector<ByteArray>;
+template class Vector<FixedLenByteArray>;
+
+const int ChunkedAllocator::INITIAL_CHUNK_SIZE;
+const int ChunkedAllocator::MAX_CHUNK_SIZE;
+
+ChunkedAllocator::ChunkedAllocator(MemoryAllocator* allocator)
+ : current_chunk_idx_(-1),
+ next_chunk_size_(INITIAL_CHUNK_SIZE),
+ total_allocated_bytes_(0),
+ peak_allocated_bytes_(0),
+ total_reserved_bytes_(0),
+ allocator_(allocator) {}
+
+ChunkedAllocator::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
+ : data(buf), size(size), allocated_bytes(0) {}
+
+ChunkedAllocator::~ChunkedAllocator() {
+ int64_t total_bytes_released = 0;
+ for (size_t i = 0; i < chunks_.size(); ++i) {
+ total_bytes_released += chunks_[i].size;
+ allocator_->Free(chunks_[i].data, chunks_[i].size);
+ }
+
+ DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
+}
+
+void ChunkedAllocator::ReturnPartialAllocation(int byte_size) {
+ DCHECK_GE(byte_size, 0);
+ DCHECK(current_chunk_idx_ != -1);
+ ChunkInfo& info = chunks_[current_chunk_idx_];
+ DCHECK_GE(info.allocated_bytes, byte_size);
+ info.allocated_bytes -= byte_size;
+ total_allocated_bytes_ -= byte_size;
+}
+
+template <bool CHECK_LIMIT_FIRST>
+uint8_t* ChunkedAllocator::Allocate(int size) {
+ if (size == 0) return NULL;
+
+ int64_t num_bytes = BitUtil::RoundUp(size, 8);
+ if (current_chunk_idx_ == -1 ||
+ num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
+ chunks_[current_chunk_idx_].size) {
+ // If we couldn't allocate a new chunk, return NULL.
+ if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
+ }
+ ChunkInfo& info = chunks_[current_chunk_idx_];
+ uint8_t* result = info.data + info.allocated_bytes;
+ DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
+ info.allocated_bytes += num_bytes;
+ total_allocated_bytes_ += num_bytes;
+ DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
+ peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+ return result;
+}
+
+uint8_t* ChunkedAllocator::Allocate(int size) {
+ return Allocate<false>(size);
+}
+
+void ChunkedAllocator::Clear() {
+ current_chunk_idx_ = -1;
+ for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {
+ chunk->allocated_bytes = 0;
+ }
+ total_allocated_bytes_ = 0;
+ DCHECK(CheckIntegrity(false));
+}
+
+void ChunkedAllocator::FreeAll() {
+ int64_t total_bytes_released = 0;
+ for (size_t i = 0; i < chunks_.size(); ++i) {
+ total_bytes_released += chunks_[i].size;
+ allocator_->Free(chunks_[i].data, chunks_[i].size);
+ }
+ chunks_.clear();
+ next_chunk_size_ = INITIAL_CHUNK_SIZE;
+ current_chunk_idx_ = -1;
+ total_allocated_bytes_ = 0;
+ total_reserved_bytes_ = 0;
+}
+
+bool ChunkedAllocator::FindChunk(int64_t min_size) {
+ // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
+ // after the current chunk.
+ int first_free_idx = current_chunk_idx_ + 1;
+ // (cast size() to signed int in order to avoid everything else being cast to
+ // unsigned long, in particular -1)
+ while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
+ // we found a free chunk
+ DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
+
+ if (chunks_[current_chunk_idx_].size >= min_size) {
+ // This chunk is big enough. Move it before the other free chunks.
+ if (current_chunk_idx_ != first_free_idx) {
+ std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
+ current_chunk_idx_ = first_free_idx;
+ }
+ break;
+ }
+ }
+
+ if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
+ // need to allocate new chunk.
+ int64_t chunk_size;
+ DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
+ DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
+
+ chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
+
+ // Allocate a new chunk. Return early if malloc fails.
+ uint8_t* buf = nullptr;
+ PARQUET_THROW_NOT_OK(allocator_->Allocate(chunk_size, &buf));
+ if (UNLIKELY(buf == NULL)) {
+ DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
+ current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
+ return false;
+ }
+
+ // If there are no free chunks put it at the end, otherwise before the first free.
+ if (first_free_idx == static_cast<int>(chunks_.size())) {
+ chunks_.push_back(ChunkInfo(chunk_size, buf));
+ } else {
+ current_chunk_idx_ = first_free_idx;
+ auto insert_chunk = chunks_.begin() + current_chunk_idx_;
+ chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
+ }
+ total_reserved_bytes_ += chunk_size;
+ // Don't increment the chunk size until the allocation succeeds: if an attempted
+ // large allocation fails we don't want to increase the chunk size further.
+ next_chunk_size_ =
+ static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
+ }
+
+ DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
+ DCHECK(CheckIntegrity(true));
+ return true;
+}
+
+void ChunkedAllocator::AcquireData(ChunkedAllocator* src, bool keep_current) {
+ DCHECK(src->CheckIntegrity(false));
+ int num_acquired_chunks;
+ if (keep_current) {
+ num_acquired_chunks = src->current_chunk_idx_;
+ } else if (src->GetFreeOffset() == 0) {
+ // nothing in the last chunk
+ num_acquired_chunks = src->current_chunk_idx_;
+ } else {
+ num_acquired_chunks = src->current_chunk_idx_ + 1;
+ }
+
+ if (num_acquired_chunks <= 0) {
+ if (!keep_current) src->FreeAll();
+ return;
+ }
+
+ auto end_chunk = src->chunks_.begin() + num_acquired_chunks;
+ int64_t total_transfered_bytes = 0;
+ for (auto i = src->chunks_.begin(); i != end_chunk; ++i) {
+ total_transfered_bytes += i->size;
+ }
+ src->total_reserved_bytes_ -= total_transfered_bytes;
+ total_reserved_bytes_ += total_transfered_bytes;
+
+ // insert new chunks after current_chunk_idx_
+ auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1;
+ chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
+ src->chunks_.erase(src->chunks_.begin(), end_chunk);
+ current_chunk_idx_ += num_acquired_chunks;
+
+ if (keep_current) {
+ src->current_chunk_idx_ = 0;
+ DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
+ total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset();
+ src->total_allocated_bytes_ = src->GetFreeOffset();
+ } else {
+ src->current_chunk_idx_ = -1;
+ total_allocated_bytes_ += src->total_allocated_bytes_;
+ src->total_allocated_bytes_ = 0;
+ }
+ peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+
+ if (!keep_current) src->FreeAll();
+ DCHECK(CheckIntegrity(false));
+}
+
+std::string ChunkedAllocator::DebugString() {
+ std::stringstream out;
+ char str[16];
+ out << "ChunkedAllocator(#chunks=" << chunks_.size() << " [";
+ for (size_t i = 0; i < chunks_.size(); ++i) {
+ sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data)); // NOLINT
+ out << (i > 0 ? " " : "") << str << chunks_[i].size << "/"
+ << chunks_[i].allocated_bytes;
+ }
+ out << "] current_chunk=" << current_chunk_idx_
+ << " total_sizes=" << GetTotalChunkSizes()
+ << " total_alloc=" << total_allocated_bytes_ << ")";
+ return out.str();
+}
+
+int64_t ChunkedAllocator::GetTotalChunkSizes() const {
+ int64_t result = 0;
+ for (size_t i = 0; i < chunks_.size(); ++i) {
+ result += chunks_[i].size;
+ }
+ return result;
+}
+
+bool ChunkedAllocator::CheckIntegrity(bool current_chunk_empty) {
+ // check that current_chunk_idx_ points to the last chunk with allocated data
+ DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
+ int64_t total_allocated = 0;
+ for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) {
+ DCHECK_GT(chunks_[i].size, 0);
+ if (i < current_chunk_idx_) {
+ DCHECK_GT(chunks_[i].allocated_bytes, 0);
+ } else if (i == current_chunk_idx_) {
+ if (current_chunk_empty) {
+ DCHECK_EQ(chunks_[i].allocated_bytes, 0);
+ } else {
+ DCHECK_GT(chunks_[i].allocated_bytes, 0);
+ }
+ } else {
+ DCHECK_EQ(chunks_[i].allocated_bytes, 0);
+ }
+ total_allocated += chunks_[i].allocated_bytes;
+ }
+ DCHECK_EQ(total_allocated, total_allocated_bytes_);
+ return true;
+}
+
+// ----------------------------------------------------------------------
+// Arrow IO wrappers
+
+// Close the output stream
+void ArrowFileMethods::Close() {
+ PARQUET_THROW_NOT_OK(file_interface()->Close());
+}
+
+// Return the current position in the output stream relative to the start
+int64_t ArrowFileMethods::Tell() {
+ int64_t position = 0;
+ PARQUET_THROW_NOT_OK(file_interface()->Tell(&position));
+ return position;
+}
+
+ArrowInputFile::ArrowInputFile(
+ const std::shared_ptr<::arrow::io::ReadableFileInterface>& file)
+ : file_(file) {}
+
+::arrow::io::FileInterface* ArrowInputFile::file_interface() {
+ return file_.get();
+}
+
+int64_t ArrowInputFile::Size() const {
+ int64_t size;
+ PARQUET_THROW_NOT_OK(file_->GetSize(&size));
+ return size;
+}
+
+void ArrowInputFile::Seek(int64_t position) {
+ PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+// Returns bytes read
+int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) {
+ int64_t bytes_read = 0;
+ PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+ return bytes_read;
+}
+
+std::shared_ptr<Buffer> ArrowInputFile::Read(int64_t nbytes) {
+ std::shared_ptr<Buffer> out;
+ PARQUET_THROW_NOT_OK(file_->Read(nbytes, &out));
+ return out;
+}
+
+std::shared_ptr<Buffer> ArrowInputFile::ReadAt(int64_t position, int64_t nbytes) {
+ std::shared_ptr<Buffer> out;
+ PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &out));
+ return out;
+}
+
+ArrowOutputStream::ArrowOutputStream(
+ const std::shared_ptr<::arrow::io::OutputStream> file)
+ : file_(file) {}
+
+::arrow::io::FileInterface* ArrowOutputStream::file_interface() {
+ return file_.get();
+}
+
+// Copy bytes into the output stream
+void ArrowOutputStream::Write(const uint8_t* data, int64_t length) {
+ PARQUET_THROW_NOT_OK(file_->Write(data, length));
+}
+
+// ----------------------------------------------------------------------
+// InMemoryInputStream
+
+InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer)
+ : buffer_(buffer), offset_(0) {
+ len_ = buffer_->size();
+}
+
+InMemoryInputStream::InMemoryInputStream(
+ RandomAccessSource* source, int64_t start, int64_t num_bytes)
+ : offset_(0) {
+ buffer_ = source->ReadAt(start, num_bytes);
+ if (buffer_->size() < num_bytes) {
+ throw ParquetException("Unable to read column chunk data");
+ }
+ len_ = buffer_->size();
+}
+
+const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+ *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
+ return buffer_->data() + offset_;
+}
+
+const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+ const uint8_t* result = Peek(num_to_read, num_bytes);
+ offset_ += *num_bytes;
+ return result;
+}
+
+void InMemoryInputStream::Advance(int64_t num_bytes) {
+ offset_ += num_bytes;
+}
+
+// ----------------------------------------------------------------------
+// In-memory output stream
+
+InMemoryOutputStream::InMemoryOutputStream(
+ MemoryAllocator* allocator, int64_t initial_capacity)
+ : size_(0), capacity_(initial_capacity) {
+ if (initial_capacity == 0) { initial_capacity = kInMemoryDefaultCapacity; }
+ buffer_ = AllocateBuffer(allocator, initial_capacity);
+}
+
+InMemoryOutputStream::~InMemoryOutputStream() {}
+
+uint8_t* InMemoryOutputStream::Head() {
+ return buffer_->mutable_data() + size_;
+}
+
+void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
+ if (size_ + length > capacity_) {
+ int64_t new_capacity = capacity_ * 2;
+ while (new_capacity < size_ + length) {
+ new_capacity *= 2;
+ }
+ PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity));
+ capacity_ = new_capacity;
+ }
+ memcpy(Head(), data, length);
+ size_ += length;
+}
+
+int64_t InMemoryOutputStream::Tell() {
+ return size_;
+}
+
+std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
+ PARQUET_THROW_NOT_OK(buffer_->Resize(size_));
+ std::shared_ptr<Buffer> result = buffer_;
+ buffer_ = nullptr;
+ return result;
+}
+
+// ----------------------------------------------------------------------
+// BufferedInputStream
+
+BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+ RandomAccessSource* source, int64_t start, int64_t num_bytes)
+ : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
+ buffer_ = AllocateBuffer(pool, buffer_size);
+ buffer_size_ = buffer_->size();
+ // Required to force a lazy read
+ buffer_offset_ = buffer_size_;
+}
+
+const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+ *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_);
+ // increase the buffer size if needed
+ if (*num_bytes > buffer_size_) {
+ PARQUET_THROW_NOT_OK(buffer_->Resize(*num_bytes));
+ buffer_size_ = buffer_->size();
+ DCHECK(buffer_size_ >= *num_bytes);
+ }
+ // Read more data when buffer has insufficient left or when resized
+ if (*num_bytes > (buffer_size_ - buffer_offset_)) {
+ source_->Seek(stream_offset_);
+ buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
+ int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data());
+ if (bytes_read < *num_bytes) {
+ throw ParquetException("Failed reading column data from source");
+ }
+ buffer_offset_ = 0;
+ }
+ return buffer_->data() + buffer_offset_;
+}
+
+const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+ const uint8_t* result = Peek(num_to_read, num_bytes);
+ stream_offset_ += *num_bytes;
+ buffer_offset_ += *num_bytes;
+ return result;
+}
+
+void BufferedInputStream::Advance(int64_t num_bytes) {
+ stream_offset_ += num_bytes;
+ buffer_offset_ += num_bytes;
+}
+
+std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size) {
+ auto result = std::make_shared<PoolBuffer>(allocator);
+ if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); }
+ return result;
+}
+
+std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(
+ MemoryAllocator* allocator, int64_t size) {
+ std::unique_ptr<PoolBuffer> result(new PoolBuffer(allocator));
+ if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); }
+ return result;
+}
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
new file mode 100644
index 0000000..1ffca35
--- /dev/null
+++ b/src/parquet/util/memory.h
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_MEMORY_H
+#define PARQUET_UTIL_MEMORY_H
+
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+
+#include "parquet/exception.h"
+#include "parquet/util/macros.h"
+#include "parquet/util/visibility.h"
+
+#define PARQUET_CATCH_NOT_OK(s) \
+ try { \
+ (s); \
+ } catch (const ::parquet::ParquetException& e) { \
+ return ::arrow::Status::IOError(e.what()); \
+ }
+
+#define PARQUET_IGNORE_NOT_OK(s) \
+ try { \
+ (s); \
+ } catch (const ::parquet::ParquetException& e) {}
+
+#define PARQUET_THROW_NOT_OK(s) \
+ do { \
+ ::arrow::Status _s = (s); \
+ if (!_s.ok()) { \
+ std::stringstream ss; \
+ ss << "Arrow error: " << _s.ToString(); \
+ ::parquet::ParquetException::Throw(ss.str()); \
+ } \
+ } while (0);
+
+namespace parquet {
+
+static constexpr int64_t kInMemoryDefaultCapacity = 1024;
+
+using Buffer = ::arrow::Buffer;
+using MutableBuffer = ::arrow::MutableBuffer;
+using ResizableBuffer = ::arrow::ResizableBuffer;
+using PoolBuffer = ::arrow::PoolBuffer;
+using MemoryAllocator = ::arrow::MemoryPool;
+
+PARQUET_EXPORT MemoryAllocator* default_allocator();
+
+class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
+ public:
+ TrackingAllocator() : total_memory_(0), max_memory_(0) {}
+
+ ::arrow::Status Allocate(int64_t size, uint8_t** out) override;
+ void Free(uint8_t* p, int64_t size) override;
+
+ int64_t bytes_allocated() const override { return total_memory_; }
+
+ int64_t max_memory() { return max_memory_; }
+
+ private:
+ std::mutex stats_mutex_;
+ int64_t total_memory_;
+ int64_t max_memory_;
+};
+
+template <class T>
+class Vector {
+ public:
+ explicit Vector(int64_t size, MemoryAllocator* allocator);
+ void Resize(int64_t new_size);
+ void Reserve(int64_t new_capacity);
+ void Assign(int64_t size, const T val);
+ void Swap(Vector<T>& v);
+ inline T& operator[](int64_t i) const { return data_[i]; }
+
+ private:
+ std::unique_ptr<PoolBuffer> buffer_;
+ int64_t size_;
+ int64_t capacity_;
+ T* data_;
+
+ DISALLOW_COPY_AND_ASSIGN(Vector);
+};
+
+/// A ChunkedAllocator maintains a list of memory chunks from which it
+/// allocates memory in response to Allocate() calls; Chunks stay around for
+/// the lifetime of the allocator or until they are passed on to another
+/// allocator.
+//
+/// An Allocate() call will attempt to allocate memory from the chunk that was most
+/// recently added; if that chunk doesn't have enough memory to
+/// satisfy the allocation request, the free chunks are searched for one that is
+/// big enough otherwise a new chunk is added to the list.
+/// The current_chunk_idx_ always points to the last chunk with allocated memory.
+/// In order to keep allocation overhead low, chunk sizes double with each new one
+/// added, until they hit a maximum size.
+//
+/// Example:
+/// ChunkedAllocator* p = new ChunkedAllocator();
+/// for (int i = 0; i < 1024; ++i) {
+/// returns 8-byte aligned memory (effectively 24 bytes):
+/// .. = p->Allocate(17);
+/// }
+/// at this point, 17K have been handed out in response to Allocate() calls and
+/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
+/// We track total and peak allocated bytes. At this point they would be the same:
+/// 28k bytes. A call to Clear will return the allocated memory so
+/// total_allocate_bytes_
+/// becomes 0 while peak_allocate_bytes_ remains at 28k.
+/// p->Clear();
+/// the entire 1st chunk is returned:
+/// .. = p->Allocate(4 * 1024);
+/// 4K of the 2nd chunk are returned:
+/// .. = p->Allocate(4 * 1024);
+/// a new 20K chunk is created
+/// .. = p->Allocate(20 * 1024);
+//
+/// ChunkedAllocator* p2 = new ChunkedAllocator();
+/// the new ChunkedAllocator receives all chunks containing data from p
+/// p2->AcquireData(p, false);
+/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
+/// remains unchanged.
+/// The one remaining (empty) chunk is released:
+/// delete p;
+
+class ChunkedAllocator {
+ public:
+ explicit ChunkedAllocator(MemoryAllocator* allocator = default_allocator());
+
+ /// Frees all chunks of memory and subtracts the total allocated bytes
+ /// from the registered limits.
+ ~ChunkedAllocator();
+
+ /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
+ /// of the the current chunk. Creates a new chunk if there aren't any chunks
+ /// with enough capacity.
+ uint8_t* Allocate(int size);
+
+ /// Returns 'byte_size' to the current chunk back to the mem pool. This can
+ /// only be used to return either all or part of the previous allocation returned
+ /// by Allocate().
+ void ReturnPartialAllocation(int byte_size);
+
+ /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
+ void Clear();
+
+ /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
+ /// each mem pool
+ void FreeAll();
+
+ /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
+ /// to its last allocated chunk that contains data.
+ /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
+ void AcquireData(ChunkedAllocator* src, bool keep_current);
+
+ std::string DebugString();
+
+ int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
+ int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
+ int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
+
+ /// Return sum of chunk_sizes_.
+ int64_t GetTotalChunkSizes() const;
+
+ private:
+ friend class ChunkedAllocatorTest;
+ static const int INITIAL_CHUNK_SIZE = 4 * 1024;
+
+ /// The maximum size of chunk that should be allocated. Allocations larger than this
+ /// size will get their own individual chunk.
+ static const int MAX_CHUNK_SIZE = 1024 * 1024;
+
+ struct ChunkInfo {
+ uint8_t* data; // Owned by the ChunkInfo.
+ int64_t size; // in bytes
+
+ /// bytes allocated via Allocate() in this chunk
+ int64_t allocated_bytes;
+
+ explicit ChunkInfo(int64_t size, uint8_t* buf);
+
+ ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
+ };
+
+ /// chunk from which we served the last Allocate() call;
+ /// always points to the last chunk that contains allocated data;
+ /// chunks 0..current_chunk_idx_ are guaranteed to contain data
+ /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
+ /// -1 if no chunks present
+ int current_chunk_idx_;
+
+ /// The size of the next chunk to allocate.
+ int64_t next_chunk_size_;
+
+ /// sum of allocated_bytes_
+ int64_t total_allocated_bytes_;
+
+ /// Maximum number of bytes allocated from this pool at one time.
+ int64_t peak_allocated_bytes_;
+
+ /// sum of all bytes allocated in chunks_
+ int64_t total_reserved_bytes_;
+
+ std::vector<ChunkInfo> chunks_;
+
+ MemoryAllocator* allocator_;
+
+ /// Find or allocated a chunk with at least min_size spare capacity and update
+ /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
+ /// if a new chunk needs to be created.
+ bool FindChunk(int64_t min_size);
+
+ /// Check integrity of the supporting data structures; always returns true but DCHECKs
+ /// all invariants.
+ /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
+ bool CheckIntegrity(bool current_chunk_empty);
+
+ /// Return offset to unoccpied space in current chunk.
+ int GetFreeOffset() const {
+ if (current_chunk_idx_ == -1) return 0;
+ return chunks_[current_chunk_idx_].allocated_bytes;
+ }
+
+ template <bool CHECK_LIMIT_FIRST>
+ uint8_t* Allocate(int size);
+};
+
+// File input and output interfaces that translate arrow::Status to exceptions
+
+class PARQUET_EXPORT FileInterface {
+ public:
+ // Close the file
+ virtual void Close() = 0;
+
+ // Return the current position in the file relative to the start
+ virtual int64_t Tell() = 0;
+};
+
+class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface {
+ public:
+ virtual ~RandomAccessSource() {}
+
+ virtual int64_t Size() const = 0;
+
+ virtual void Seek(int64_t position) = 0;
+
+ // Returns bytes read
+ virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
+
+ virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
+
+ virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0;
+};
+
+class PARQUET_EXPORT OutputStream : virtual public FileInterface {
+ public:
+ virtual ~OutputStream() {}
+
+ // Copy bytes into the output stream
+ virtual void Write(const uint8_t* data, int64_t length) = 0;
+};
+
+class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface {
+ public:
+ void Close() override;
+ int64_t Tell() override;
+
+ protected:
+ virtual ::arrow::io::FileInterface* file_interface() = 0;
+};
+
+class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource {
+ public:
+ explicit ArrowInputFile(
+ const std::shared_ptr<::arrow::io::ReadableFileInterface>& file);
+
+ int64_t Size() const override;
+
+ void Seek(int64_t position) override;
+
+ // Returns bytes read
+ int64_t Read(int64_t nbytes, uint8_t* out) override;
+
+ std::shared_ptr<Buffer> Read(int64_t nbytes) override;
+
+ std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override;
+
+ std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; }
+
+ // Diamond inheritance
+ using ArrowFileMethods::Close;
+ using ArrowFileMethods::Tell;
+
+ private:
+ ::arrow::io::FileInterface* file_interface() override;
+ std::shared_ptr<::arrow::io::ReadableFileInterface> file_;
+};
+
+class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputStream {
+ public:
+ explicit ArrowOutputStream(const std::shared_ptr<::arrow::io::OutputStream> file);
+
+ // Copy bytes into the output stream
+ void Write(const uint8_t* data, int64_t length) override;
+
+ std::shared_ptr<::arrow::io::OutputStream> file() { return file_; }
+
+ // Diamond inheritance
+ using ArrowFileMethods::Close;
+ using ArrowFileMethods::Tell;
+
+ private:
+ ::arrow::io::FileInterface* file_interface() override;
+ std::shared_ptr<::arrow::io::OutputStream> file_;
+};
+
+class PARQUET_EXPORT InMemoryOutputStream : public OutputStream {
+ public:
+ explicit InMemoryOutputStream(MemoryAllocator* allocator = default_allocator(),
+ int64_t initial_capacity = kInMemoryDefaultCapacity);
+
+ virtual ~InMemoryOutputStream();
+
+ // Close is currently a no-op with the in-memory stream
+ virtual void Close() {}
+
+ virtual int64_t Tell();
+
+ virtual void Write(const uint8_t* data, int64_t length);
+
+ // Return complete stream as Buffer
+ std::shared_ptr<Buffer> GetBuffer();
+
+ private:
+ // Mutable pointer to the current write position in the stream
+ uint8_t* Head();
+
+ std::shared_ptr<ResizableBuffer> buffer_;
+ int64_t size_;
+ int64_t capacity_;
+
+ DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
+};
+
+// ----------------------------------------------------------------------
+// Streaming input interfaces
+
+// Interface for the column reader to get the bytes. The interface is a stream
+// interface, meaning the bytes in order and once a byte is read, it does not
+// need to be read again.
+class InputStream {
+ public:
+ // Returns the next 'num_to_peek' without advancing the current position.
+ // *num_bytes will contain the number of bytes returned which can only be
+ // less than num_to_peek at end of stream cases.
+ // Since the position is not advanced, calls to this function are idempotent.
+ // The buffer returned to the caller is still owned by the input stream and must
+ // stay valid until the next call to Peek() or Read().
+ virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
+
+ // Identical to Peek(), except the current position in the stream is advanced by
+ // *num_bytes.
+ virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
+
+ // Advance the stream without reading
+ virtual void Advance(int64_t num_bytes) = 0;
+
+ virtual ~InputStream() {}
+
+ protected:
+ InputStream() {}
+};
+
+// Implementation of an InputStream when all the bytes are in memory.
+class InMemoryInputStream : public InputStream {
+ public:
+ InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
+ explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
+ virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+ virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+ virtual void Advance(int64_t num_bytes);
+
+ private:
+ std::shared_ptr<Buffer> buffer_;
+ int64_t len_;
+ int64_t offset_;
+};
+
+// Implementation of an InputStream when only some of the bytes are in memory.
+class BufferedInputStream : public InputStream {
+ public:
+ BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+ RandomAccessSource* source, int64_t start, int64_t end);
+ virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+ virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+ virtual void Advance(int64_t num_bytes);
+
+ private:
+ std::shared_ptr<PoolBuffer> buffer_;
+ RandomAccessSource* source_;
+ int64_t stream_offset_;
+ int64_t stream_end_;
+ int64_t buffer_offset_;
+ int64_t buffer_size_;
+};
+
+std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size = 0);
+
+std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(
+ MemoryAllocator* allocator, int64_t size = 0);
+
+} // namespace parquet
+
+#endif // PARQUET_UTIL_MEMORY_H