You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/03/16 17:34:21 UTC
parquet-cpp git commit: PARQUET-542: Support custom memory allocators
Repository: parquet-cpp
Updated Branches:
refs/heads/master 5c4f64557 -> 225fba78b
PARQUET-542: Support custom memory allocators
Added `MemoryAllocator` interface and a default implementation, ensured `MemPool` can use a custom allocator, added `Vector<T>` to replace `std::vector<T>`.
Author: Aliaksei Sandryhaila <al...@hp.com>
Closes #72 from asandryh/PARQUET-542 and squashes the following commits:
a740edb [Aliaksei Sandryhaila] Incorporated PR feedback.
6422e0d [Aliaksei Sandryhaila] Added MemoryAllocator interface and default implementation, ensured MemPool can use a custom allocator, added Vector<T> to replace std::vector<T>.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/225fba78
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/225fba78
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/225fba78
Branch: refs/heads/master
Commit: 225fba78b7aa162f9cd3608df3b48a309b8557c4
Parents: 5c4f645
Author: Aliaksei Sandryhaila <al...@hp.com>
Authored: Wed Mar 16 09:34:35 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Wed Mar 16 09:34:35 2016 -0700
----------------------------------------------------------------------
src/parquet/api/io.h | 1 +
src/parquet/column/reader.cc | 27 +++---
src/parquet/column/reader.h | 12 ++-
src/parquet/column/scanner.cc | 19 +++--
src/parquet/column/scanner.h | 17 ++--
src/parquet/column/test-util.h | 10 +--
src/parquet/encodings/decoder.h | 4 +-
src/parquet/encodings/delta-bit-pack-encoding.h | 13 +--
.../encodings/delta-byte-array-encoding.h | 7 +-
.../delta-length-byte-array-encoding.h | 8 +-
src/parquet/encodings/dictionary-encoding.h | 45 +++++-----
src/parquet/encodings/encoder.h | 5 +-
src/parquet/encodings/encoding-test.cc | 5 +-
src/parquet/encodings/plain-encoding.h | 15 ++--
src/parquet/file/file-deserialize-test.cc | 5 +-
src/parquet/file/reader-internal.cc | 25 +++---
src/parquet/file/reader-internal.h | 17 ++--
src/parquet/file/reader.cc | 15 ++--
src/parquet/file/reader.h | 7 +-
src/parquet/reader-test.cc | 7 +-
src/parquet/util/CMakeLists.txt | 3 +
src/parquet/util/buffer-test.cc | 2 -
src/parquet/util/buffer.cc | 90 ++++++++++++++++++--
src/parquet/util/buffer.h | 37 +++++++-
src/parquet/util/input-output-test.cc | 2 +-
src/parquet/util/input.cc | 14 ++-
src/parquet/util/input.h | 18 ++--
src/parquet/util/mem-allocator-test.cc | 67 +++++++++++++++
src/parquet/util/mem-allocator.cc | 61 +++++++++++++
src/parquet/util/mem-allocator.h | 60 +++++++++++++
src/parquet/util/mem-pool.cc | 11 +--
src/parquet/util/mem-pool.h | 9 +-
src/parquet/util/output.cc | 13 +--
src/parquet/util/output.h | 6 +-
34 files changed, 496 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/api/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/api/io.h b/src/parquet/api/io.h
index a8e555d..3a9b148 100644
--- a/src/parquet/api/io.h
+++ b/src/parquet/api/io.h
@@ -21,6 +21,7 @@
#include "parquet/exception.h"
#include "parquet/util/buffer.h"
#include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
#include "parquet/util/output.h"
#endif // PARQUET_API_IO_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index bf76d4c..902704b 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -29,11 +29,12 @@
namespace parquet_cpp {
ColumnReader::ColumnReader(const ColumnDescriptor* descr,
- std::unique_ptr<PageReader> pager)
+ std::unique_ptr<PageReader> pager, MemoryAllocator* allocator)
: descr_(descr),
pager_(std::move(pager)),
num_buffered_values_(0),
- num_decoded_values_(0) {}
+ num_decoded_values_(0),
+ allocator_(allocator) {}
template <int TYPE>
void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
@@ -59,7 +60,7 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
// TODO(wesm): investigate whether this all-or-nothing decoding of the
// dictionary makes sense and whether performance can be improved
- auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
+ auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_, allocator_);
decoder->SetDict(&dictionary);
decoders_[encoding] = decoder;
} else {
@@ -196,24 +197,26 @@ int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels)
std::shared_ptr<ColumnReader> ColumnReader::Make(
const ColumnDescriptor* descr,
- std::unique_ptr<PageReader> pager) {
+ std::unique_ptr<PageReader> pager,
+ MemoryAllocator* allocator) {
switch (descr->physical_type()) {
case Type::BOOLEAN:
- return std::make_shared<BoolReader>(descr, std::move(pager));
+ return std::make_shared<BoolReader>(descr, std::move(pager), allocator);
case Type::INT32:
- return std::make_shared<Int32Reader>(descr, std::move(pager));
+ return std::make_shared<Int32Reader>(descr, std::move(pager), allocator);
case Type::INT64:
- return std::make_shared<Int64Reader>(descr, std::move(pager));
+ return std::make_shared<Int64Reader>(descr, std::move(pager), allocator);
case Type::INT96:
- return std::make_shared<Int96Reader>(descr, std::move(pager));
+ return std::make_shared<Int96Reader>(descr, std::move(pager), allocator);
case Type::FLOAT:
- return std::make_shared<FloatReader>(descr, std::move(pager));
+ return std::make_shared<FloatReader>(descr, std::move(pager), allocator);
case Type::DOUBLE:
- return std::make_shared<DoubleReader>(descr, std::move(pager));
+ return std::make_shared<DoubleReader>(descr, std::move(pager), allocator);
case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayReader>(descr, std::move(pager));
+ return std::make_shared<ByteArrayReader>(descr, std::move(pager), allocator);
case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager));
+ return std::make_shared<FixedLenByteArrayReader>(descr,
+ std::move(pager), allocator);
default:
ParquetException::NYI("type reader not implemented");
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index f6bf100..7c880fb 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -30,15 +30,17 @@
#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet_cpp {
class ColumnReader {
public:
- ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>);
+ ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
+ MemoryAllocator* allocator = default_allocator());
static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor*,
- std::unique_ptr<PageReader>);
+ std::unique_ptr<PageReader>, MemoryAllocator* allocator = default_allocator());
// Returns true if there are still values in this column.
bool HasNext() {
@@ -95,6 +97,8 @@ class ColumnReader {
// The number of values from the current data page that have been decoded
// into memory
int num_decoded_values_;
+
+ MemoryAllocator* allocator_;
};
// API to read values from a single column. This is the main client facing API.
@@ -104,8 +108,8 @@ class TypedColumnReader : public ColumnReader {
typedef typename type_traits<TYPE>::value_type T;
TypedColumnReader(const ColumnDescriptor* schema,
- std::unique_ptr<PageReader> pager) :
- ColumnReader(schema, std::move(pager)),
+ std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) :
+ ColumnReader(schema, std::move(pager), allocator),
current_decoder_(NULL) {
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc
index 4a0b32f..f88b37c 100644
--- a/src/parquet/column/scanner.cc
+++ b/src/parquet/column/scanner.cc
@@ -25,24 +25,25 @@
namespace parquet_cpp {
std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
- int64_t batch_size) {
+ int64_t batch_size, MemoryAllocator* allocator) {
switch (col_reader->type()) {
case Type::BOOLEAN:
- return std::make_shared<BoolScanner>(col_reader, batch_size);
+ return std::make_shared<BoolScanner>(col_reader, batch_size, allocator);
case Type::INT32:
- return std::make_shared<Int32Scanner>(col_reader, batch_size);
+ return std::make_shared<Int32Scanner>(col_reader, batch_size, allocator);
case Type::INT64:
- return std::make_shared<Int64Scanner>(col_reader, batch_size);
+ return std::make_shared<Int64Scanner>(col_reader, batch_size, allocator);
case Type::INT96:
- return std::make_shared<Int96Scanner>(col_reader, batch_size);
+ return std::make_shared<Int96Scanner>(col_reader, batch_size, allocator);
case Type::FLOAT:
- return std::make_shared<FloatScanner>(col_reader, batch_size);
+ return std::make_shared<FloatScanner>(col_reader, batch_size, allocator);
case Type::DOUBLE:
- return std::make_shared<DoubleScanner>(col_reader, batch_size);
+ return std::make_shared<DoubleScanner>(col_reader, batch_size, allocator);
case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayScanner>(col_reader, batch_size);
+ return std::make_shared<ByteArrayScanner>(col_reader, batch_size, allocator);
case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size);
+ return std::make_shared<FixedLenByteArrayScanner>(col_reader,
+ batch_size, allocator);
default:
ParquetException::NYI("type reader not implemented");
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index f83cd81..ce2c26f 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -29,6 +29,7 @@
#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet_cpp {
@@ -37,10 +38,12 @@ static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
class Scanner {
public:
explicit Scanner(std::shared_ptr<ColumnReader> reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ MemoryAllocator* allocator = default_allocator()) :
batch_size_(batch_size),
level_offset_(0),
levels_buffered_(0),
+ value_buffer_(0, allocator),
value_offset_(0),
values_buffered_(0),
reader_(reader) {
@@ -52,7 +55,8 @@ class Scanner {
virtual ~Scanner() {}
static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE);
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ MemoryAllocator* allocator = default_allocator());
virtual void PrintNext(std::ostream& out, int width) = 0;
@@ -78,7 +82,7 @@ class Scanner {
int level_offset_;
int levels_buffered_;
- std::vector<uint8_t> value_buffer_;
+ OwnedMutableBuffer value_buffer_;
int value_offset_;
int64_t values_buffered_;
@@ -93,11 +97,12 @@ class TypedScanner : public Scanner {
typedef typename type_traits<TYPE>::value_type T;
explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
- int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
- Scanner(reader, batch_size) {
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+ MemoryAllocator* allocator = default_allocator()) :
+ Scanner(reader, batch_size, allocator) {
typed_reader_ = static_cast<TypedColumnReader<TYPE>*>(reader.get());
int value_byte_size = type_traits<TYPE>::value_byte_size;
- value_buffer_.resize(batch_size_ * value_byte_size);
+ value_buffer_.Resize(batch_size_ * value_byte_size);
values_ = reinterpret_cast<T*>(&value_buffer_[0]);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 4d10a42..0b9b599 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -252,7 +252,7 @@ class DictionaryPageBuilder {
if (TN == Type::FIXED_LEN_BYTE_ARRAY) {
type_length = d->type_length();
}
- encoder_.reset(new DictEncoder<TC>(&pool_, type_length));
+ encoder_.reset(new DictEncoder<TC>(&pool_, default_allocator(), type_length));
}
~DictionaryPageBuilder() {
@@ -260,7 +260,6 @@ class DictionaryPageBuilder {
}
shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
- shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>();
int num_values = values.size();
// Dictionary encoding
for (int i = 0; i < num_values; ++i) {
@@ -268,7 +267,8 @@ class DictionaryPageBuilder {
}
num_dict_values_ = encoder_->num_entries();
have_values_ = true;
- rle_indices->Resize(sizeof(int) * encoder_->EstimatedDataEncodedSize());
+ shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>(
+ sizeof(int) * encoder_->EstimatedDataEncodedSize());
int actual_bytes = encoder_->WriteIndices(rle_indices->mutable_data(),
rle_indices->size());
rle_indices->Resize(actual_bytes);
@@ -277,8 +277,8 @@ class DictionaryPageBuilder {
}
shared_ptr<Buffer> WriteDict() {
- shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>();
- dict_buffer->Resize(encoder_->dict_encoded_size());
+ shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>(
+ encoder_->dict_encoded_size());
encoder_->WriteDict(dict_buffer->mutable_data());
return dict_buffer;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/decoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h
index 55b29e8..5bbec0b 100644
--- a/src/parquet/encodings/decoder.h
+++ b/src/parquet/encodings/decoder.h
@@ -22,6 +22,7 @@
#include "parquet/exception.h"
#include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet_cpp {
@@ -54,8 +55,7 @@ class Decoder {
const Encoding::type encoding() const { return encoding_; }
protected:
- explicit Decoder(const ColumnDescriptor* descr,
- const Encoding::type& encoding)
+ explicit Decoder(const ColumnDescriptor* descr, const Encoding::type& encoding)
: descr_(descr), encoding_(encoding), num_values_(0) {}
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 3e36af6..4bc0418 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -24,6 +24,7 @@
#include "parquet/encodings/decoder.h"
#include "parquet/util/bit-stream-utils.inline.h"
+#include "parquet/util/buffer.h"
namespace parquet_cpp {
@@ -32,8 +33,10 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
public:
typedef typename type_traits<TYPE>::value_type T;
- explicit DeltaBitPackDecoder(const ColumnDescriptor* descr)
- : Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED) {
+ explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
+ MemoryAllocator* allocator = default_allocator())
+ : Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED),
+ delta_bit_widths_(0, allocator) {
if (TYPE != Type::INT32 && TYPE != Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
@@ -61,7 +64,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
ParquetException::EofException();
}
if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
- delta_bit_widths_.resize(num_mini_blocks_);
+ delta_bit_widths_.Resize(num_mini_blocks_);
if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
for (int i = 0; i < num_mini_blocks_; ++i) {
@@ -81,7 +84,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
for (int i = 0; i < max_values; ++i) {
if (UNLIKELY(values_current_mini_block_ == 0)) {
++mini_block_idx_;
- if (mini_block_idx_ < delta_bit_widths_.size()) {
+ if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_.size())) {
delta_bit_width_ = delta_bit_widths_[mini_block_idx_];
values_current_mini_block_ = values_per_mini_block_;
} else {
@@ -111,7 +114,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
int32_t min_delta_;
size_t mini_block_idx_;
- std::vector<uint8_t> delta_bit_widths_;
+ OwnedMutableBuffer delta_bit_widths_;
int delta_bit_width_;
int32_t last_value_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/delta-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h
index 01dceea..e42179a 100644
--- a/src/parquet/encodings/delta-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-byte-array-encoding.h
@@ -28,10 +28,11 @@ namespace parquet_cpp {
class DeltaByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
public:
- explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr)
+ explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+ MemoryAllocator* allocator = default_allocator())
: Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_BYTE_ARRAY),
- prefix_len_decoder_(nullptr),
- suffix_decoder_(nullptr) {
+ prefix_len_decoder_(nullptr, allocator),
+ suffix_decoder_(nullptr, allocator) {
}
virtual void SetData(int num_values, const uint8_t* data, int len) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/delta-length-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h
index a1b4fd3..367528c 100644
--- a/src/parquet/encodings/delta-length-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-length-byte-array-encoding.h
@@ -29,10 +29,10 @@ namespace parquet_cpp {
class DeltaLengthByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
public:
- explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr)
- : Decoder<Type::BYTE_ARRAY>(descr,
- Encoding::DELTA_LENGTH_BYTE_ARRAY),
- len_decoder_(nullptr) {
+ explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
+ MemoryAllocator* allocator = default_allocator()) :
+ Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
+ len_decoder_(nullptr, allocator) {
}
virtual void SetData(int num_values, const uint8_t* data, int len) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 43669b7..07981cf 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -27,8 +27,10 @@
#include "parquet/encodings/decoder.h"
#include "parquet/encodings/encoder.h"
#include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/buffer.h"
#include "parquet/util/cpu-info.h"
#include "parquet/util/hash-util.h"
+#include "parquet/util/mem-allocator.h"
#include "parquet/util/mem-pool.h"
#include "parquet/util/rle-encoding.h"
@@ -42,9 +44,10 @@ class DictionaryDecoder : public Decoder<TYPE> {
// Initializes the dictionary with values from 'dictionary'. The data in
// dictionary is not guaranteed to persist in memory after this call so the
// dictionary decoder needs to copy the data out if necessary.
- explicit DictionaryDecoder(const ColumnDescriptor* descr)
- : Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) {
- }
+ explicit DictionaryDecoder(const ColumnDescriptor* descr,
+ MemoryAllocator* allocator = default_allocator()):
+ Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY), dictionary_(0, allocator),
+ byte_array_data_(0, allocator) {}
// Perform type-specific initiatialization
void SetDict(Decoder<TYPE>* dictionary);
@@ -77,11 +80,11 @@ class DictionaryDecoder : public Decoder<TYPE> {
}
// Only one is set.
- std::vector<T> dictionary_;
+ Vector<T> dictionary_;
// Data that contains the byte array data (byte_array_dictionary_ just has the
// pointers).
- std::vector<uint8_t> byte_array_data_;
+ OwnedMutableBuffer byte_array_data_;
RleDecoder idx_decoder_;
};
@@ -89,7 +92,7 @@ class DictionaryDecoder : public Decoder<TYPE> {
template <int TYPE>
inline void DictionaryDecoder<TYPE>::SetDict(Decoder<TYPE>* dictionary) {
int num_dictionary_values = dictionary->values_left();
- dictionary_.resize(num_dictionary_values);
+ dictionary_.Resize(num_dictionary_values);
dictionary->Decode(&dictionary_[0], num_dictionary_values);
}
@@ -103,14 +106,14 @@ template <>
inline void DictionaryDecoder<Type::BYTE_ARRAY>::SetDict(
Decoder<Type::BYTE_ARRAY>* dictionary) {
int num_dictionary_values = dictionary->values_left();
- dictionary_.resize(num_dictionary_values);
+ dictionary_.Resize(num_dictionary_values);
dictionary->Decode(&dictionary_[0], num_dictionary_values);
int total_size = 0;
for (int i = 0; i < num_dictionary_values; ++i) {
total_size += dictionary_[i].len;
}
- byte_array_data_.resize(total_size);
+ byte_array_data_.Resize(total_size);
int offset = 0;
for (int i = 0; i < num_dictionary_values; ++i) {
memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len);
@@ -123,13 +126,13 @@ template <>
inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::SetDict(
Decoder<Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
int num_dictionary_values = dictionary->values_left();
- dictionary_.resize(num_dictionary_values);
+ dictionary_.Resize(num_dictionary_values);
dictionary->Decode(&dictionary_[0], num_dictionary_values);
int fixed_len = descr_->type_length();
int total_size = num_dictionary_values*fixed_len;
- byte_array_data_.resize(total_size);
+ byte_array_data_.Resize(total_size);
int offset = 0;
for (int i = 0; i < num_dictionary_values; ++i) {
memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
@@ -198,12 +201,14 @@ class DictEncoderBase {
int dict_encoded_size() { return dict_encoded_size_; }
protected:
- explicit DictEncoderBase(MemPool* pool) :
+ explicit DictEncoderBase(MemPool* pool, MemoryAllocator* allocator) :
hash_table_size_(INITIAL_HASH_TABLE_SIZE),
mod_bitmask_(hash_table_size_ - 1),
- hash_slots_(hash_table_size_, HASH_SLOT_EMPTY),
+ hash_slots_(0, allocator),
+ allocator_(allocator),
pool_(pool),
dict_encoded_size_(0) {
+ hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY);
if (!CpuInfo::initialized()) {
CpuInfo::Init();
}
@@ -219,7 +224,8 @@ class DictEncoderBase {
// We use a fixed-size hash table with linear probing
//
// These values correspond to the uniques_ array
- std::vector<hash_slot_t> hash_slots_;
+ Vector<hash_slot_t> hash_slots_;
+ MemoryAllocator* allocator_;
// For ByteArray / FixedLenByteArray data. Not owned
MemPool* pool_;
@@ -234,9 +240,9 @@ class DictEncoderBase {
template <typename T>
class DictEncoder : public DictEncoderBase {
public:
- explicit DictEncoder(MemPool* pool = nullptr, int type_length = -1) :
- DictEncoderBase(pool),
- type_length_(type_length) { }
+ explicit DictEncoder(MemPool* pool = nullptr,
+ MemoryAllocator* allocator = default_allocator(), int type_length = -1) :
+ DictEncoderBase(pool, allocator), type_length_(type_length) {}
// TODO(wesm): think about how to address the construction semantics in
// encodings/dictionary-encoding.h
@@ -331,7 +337,8 @@ inline void DictEncoder<T>::Put(const T& v) {
template <typename T>
inline void DictEncoder<T>::DoubleTableSize() {
int new_size = hash_table_size_ * 2;
- std::vector<hash_slot_t> new_hash_slots(new_size, HASH_SLOT_EMPTY);
+ Vector<hash_slot_t> new_hash_slots(0, allocator_);
+ new_hash_slots.Assign(new_size, HASH_SLOT_EMPTY);
hash_slot_t index, slot;
int j;
for (int i = 0; i < hash_table_size_; ++i) {
@@ -360,7 +367,8 @@ inline void DictEncoder<T>::DoubleTableSize() {
hash_table_size_ = new_size;
mod_bitmask_ = new_size - 1;
- new_hash_slots.swap(hash_slots_);
+
+ hash_slots_.Swap(new_hash_slots);
}
template<typename T>
@@ -376,7 +384,6 @@ inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) {
throw ParquetException("out of memory");
}
memcpy(heap, v.ptr, v.len);
-
uniques_.push_back(ByteArray(v.len, heap));
dict_encoded_size_ += v.len + sizeof(uint32_t);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index ce91a29..b7201d5 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -43,12 +43,13 @@ class Encoder {
protected:
explicit Encoder(const ColumnDescriptor* descr,
- const Encoding::type& encoding)
- : descr_(descr), encoding_(encoding) {}
+ const Encoding::type& encoding, MemoryAllocator* allocator)
+ : descr_(descr), encoding_(encoding), allocator_(allocator) {}
// For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
const ColumnDescriptor* descr_;
const Encoding::type encoding_;
+ MemoryAllocator* allocator_;
};
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index 490047c..96d1b29 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -151,6 +151,7 @@ class TestEncodingBase : public ::testing::Test {
if (descr_) {
type_length_ = descr_->type_length();
}
+ allocator_ = default_allocator();
}
void TearDown() {
@@ -182,6 +183,7 @@ class TestEncodingBase : public ::testing::Test {
protected:
MemPool pool_;
+ MemoryAllocator* allocator_;
int num_values_;
int type_length_;
@@ -199,6 +201,7 @@ class TestEncodingBase : public ::testing::Test {
// out an alternative to this class layering at some point
#define USING_BASE_MEMBERS() \
using TestEncodingBase<Type>::pool_; \
+ using TestEncodingBase<Type>::allocator_; \
using TestEncodingBase<Type>::descr_; \
using TestEncodingBase<Type>::num_values_; \
using TestEncodingBase<Type>::draws_; \
@@ -252,7 +255,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
static constexpr int TYPE = Type::type_num;
void CheckRoundtrip() {
- DictEncoder<T> encoder(&pool_, type_length_);
+ DictEncoder<T> encoder(&pool_, allocator_, type_length_);
dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
auto indices = std::make_shared<OwnedMutableBuffer>();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 95c353c..6d63023 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -25,6 +25,7 @@
#include "parquet/encodings/encoder.h"
#include "parquet/schema/descriptor.h"
#include "parquet/util/bit-stream-utils.inline.h"
+#include "parquet/util/buffer.h"
#include "parquet/util/output.h"
namespace parquet_cpp {
@@ -172,8 +173,9 @@ class PlainEncoder : public Encoder<TYPE> {
public:
typedef typename type_traits<TYPE>::value_type T;
- explicit PlainEncoder(const ColumnDescriptor* descr) :
- Encoder<TYPE>(descr, Encoding::PLAIN) {}
+ explicit PlainEncoder(const ColumnDescriptor* descr,
+ MemoryAllocator* allocator = default_allocator()) :
+ Encoder<TYPE>(descr, Encoding::PLAIN, allocator) {}
void Encode(const T* src, int num_values, OutputStream* dst);
};
@@ -181,12 +183,13 @@ class PlainEncoder : public Encoder<TYPE> {
template <>
class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
public:
- explicit PlainEncoder(const ColumnDescriptor* descr) :
- Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {}
+ explicit PlainEncoder(const ColumnDescriptor* descr,
+ MemoryAllocator* allocator = default_allocator()) :
+ Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN, allocator) {}
virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
int bytes_required = BitUtil::Ceil(num_values, 8);
- std::vector<uint8_t> tmp_buffer(bytes_required);
+ OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
BitWriter bit_writer(&tmp_buffer[0], bytes_required);
for (int i = 0; i < num_values; ++i) {
@@ -205,7 +208,7 @@ class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
// Use a temporary buffer for now and copy, because the BitWriter is not
// aware of OutputStream. Later we can add some kind of Request/Flush API
// to OutputStream
- std::vector<uint8_t> tmp_buffer(bytes_required);
+ OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
BitWriter bit_writer(&tmp_buffer[0], bytes_required);
for (int i = 0; i < num_values; ++i) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 2a7c347..6b7755c 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -87,7 +87,7 @@ class TestPageSerde : public ::testing::Test {
}
void ResetStream() {
- out_stream_.reset(new InMemoryOutputStream());
+ out_stream_.reset(new InMemoryOutputStream);
}
void EndStream() {
@@ -244,7 +244,8 @@ class TestParquetFileReader : public ::testing::Test {
std::unique_ptr<BufferReader> reader(new BufferReader(buffer));
reader_.reset(new ParquetFileReader());
- ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(reader))),
+ ASSERT_THROW(
+ reader_->Open(SerializedFile::Open(std::move(reader))),
ParquetException);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 89e8298..4c192f4 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -42,8 +42,9 @@ namespace parquet_cpp {
// assembled in a serialized stream for storing in a Parquet files
SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
- Compression::type codec_type) :
- stream_(std::move(stream)) {
+ Compression::type codec_type, MemoryAllocator* allocator) :
+ stream_(std::move(stream)),
+ decompression_buffer_(0, allocator) {
max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
decompressor_ = Codec::Create(codec_type);
}
@@ -97,7 +98,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
if (decompressor_ != NULL) {
// Grow the uncompressed buffer if we need to.
if (uncompressed_len > static_cast<int>(decompression_buffer_.size())) {
- decompression_buffer_.resize(uncompressed_len);
+ decompression_buffer_.Resize(uncompressed_len);
}
decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
&decompression_buffer_[0]);
@@ -181,7 +182,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
std::unique_ptr<InputStream> stream(new InMemoryInputStream(buffer));
return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream),
- FromThrift(col.meta_data.codec)));
+ FromThrift(col.meta_data.codec), allocator_));
}
RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
@@ -204,9 +205,9 @@ static constexpr uint32_t FOOTER_SIZE = 8;
static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
- std::unique_ptr<RandomAccessSource> source) {
+ std::unique_ptr<RandomAccessSource> source, MemoryAllocator* allocator) {
std::unique_ptr<ParquetFileReader::Contents> result(
- new SerializedFile(std::move(source)));
+ new SerializedFile(std::move(source), allocator));
// Access private methods here, but otherwise unavailable
SerializedFile* file = static_cast<SerializedFile*>(result.get());
@@ -227,9 +228,9 @@ SerializedFile::~SerializedFile() {
std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
std::unique_ptr<SerializedRowGroup> contents(new SerializedRowGroup(source_.get(),
- &metadata_.row_groups[i]));
+ &metadata_.row_groups[i], allocator_));
- return std::make_shared<RowGroupReader>(&schema_, std::move(contents));
+ return std::make_shared<RowGroupReader>(&schema_, std::move(contents), allocator_);
}
int64_t SerializedFile::num_rows() const {
@@ -244,8 +245,10 @@ int SerializedFile::num_row_groups() const {
return metadata_.row_groups.size();
}
-SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source) :
- source_(std::move(source)) {}
+SerializedFile::SerializedFile(
+ std::unique_ptr<RandomAccessSource> source,
+ MemoryAllocator* allocator = default_allocator()) :
+ source_(std::move(source)), allocator_(allocator) {}
void SerializedFile::ParseMetaData() {
@@ -271,7 +274,7 @@ void SerializedFile::ParseMetaData() {
}
source_->Seek(metadata_start);
- std::vector<uint8_t> metadata_buffer(metadata_len);
+ OwnedMutableBuffer metadata_buffer(metadata_len, allocator_);
bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
if (bytes_read != metadata_len) {
throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index b62f249..847c8a9 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -43,7 +43,7 @@ static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
class SerializedPageReader : public PageReader {
public:
SerializedPageReader(std::unique_ptr<InputStream> stream,
- Compression::type codec);
+ Compression::type codec, MemoryAllocator* allocator = default_allocator());
virtual ~SerializedPageReader() {}
@@ -62,7 +62,7 @@ class SerializedPageReader : public PageReader {
// Compression codec to use.
std::unique_ptr<Codec> decompressor_;
- std::vector<uint8_t> decompression_buffer_;
+ OwnedMutableBuffer decompression_buffer_;
// Maximum allowed page size
uint32_t max_page_header_size_;
};
@@ -71,9 +71,10 @@ class SerializedPageReader : public PageReader {
class SerializedRowGroup : public RowGroupReader::Contents {
public:
SerializedRowGroup(RandomAccessSource* source,
- const parquet::RowGroup* metadata) :
+ const parquet::RowGroup* metadata, MemoryAllocator* allocator) :
source_(source),
- metadata_(metadata) {}
+ metadata_(metadata),
+ allocator_(allocator) {}
virtual int num_columns() const;
virtual int64_t num_rows() const;
@@ -83,6 +84,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
private:
RandomAccessSource* source_;
const parquet::RowGroup* metadata_;
+ MemoryAllocator* allocator_;
};
// An implementation of ParquetFileReader::Contents that deals with the Parquet
@@ -95,7 +97,8 @@ class SerializedFile : public ParquetFileReader::Contents {
// This class does _not_ take ownership of the data source. You must manage its
// lifetime separately
static std::unique_ptr<ParquetFileReader::Contents> Open(
- std::unique_ptr<RandomAccessSource> source);
+ std::unique_ptr<RandomAccessSource> source,
+ MemoryAllocator* allocator = default_allocator());
virtual void Close();
virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
virtual int64_t num_rows() const;
@@ -105,10 +108,12 @@ class SerializedFile : public ParquetFileReader::Contents {
private:
// This class takes ownership of the provided data source
- explicit SerializedFile(std::unique_ptr<RandomAccessSource> source);
+ explicit SerializedFile(std::unique_ptr<RandomAccessSource> source,
+ MemoryAllocator* allocator);
std::unique_ptr<RandomAccessSource> source_;
parquet::FileMetaData metadata_;
+ MemoryAllocator* allocator_;
void ParseMetaData();
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 2937f9e..9020008 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -41,9 +41,10 @@ namespace parquet_cpp {
// RowGroupReader public API
RowGroupReader::RowGroupReader(const SchemaDescriptor* schema,
- std::unique_ptr<Contents> contents) :
+ std::unique_ptr<Contents> contents, MemoryAllocator* allocator) :
schema_(schema),
- contents_(std::move(contents)) {}
+ contents_(std::move(contents)),
+ allocator_(allocator) {}
int RowGroupReader::num_columns() const {
return contents_->num_columns();
@@ -58,7 +59,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
const ColumnDescriptor* descr = schema_->Column(i);
std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
- return ColumnReader::Make(descr, std::move(page_reader));
+ return ColumnReader::Make(descr, std::move(page_reader), allocator_);
}
RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
@@ -74,16 +75,16 @@ ParquetFileReader::~ParquetFileReader() {
}
std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path,
- bool memory_map) {
+ bool memory_map, MemoryAllocator* allocator) {
std::unique_ptr<LocalFileSource> file;
if (memory_map) {
- file.reset(new MemoryMapSource());
+ file.reset(new MemoryMapSource(allocator));
} else {
- file.reset(new LocalFileSource());
+ file.reset(new LocalFileSource(allocator));
}
file->Open(path);
- auto contents = SerializedFile::Open(std::move(file));
+ auto contents = SerializedFile::Open(std::move(file), allocator);
std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
result->Open(std::move(contents));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 436d1e8..f4455ac 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -48,7 +48,8 @@ class RowGroupReader {
virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
};
- RowGroupReader(const SchemaDescriptor* schema, std::unique_ptr<Contents> contents);
+ RowGroupReader(const SchemaDescriptor* schema,
+ std::unique_ptr<Contents> contents, MemoryAllocator* allocator);
// Construct a ColumnReader for the indicated row group-relative
// column. Ownership is shared with the RowGroupReader.
@@ -66,6 +67,8 @@ class RowGroupReader {
// This is declared in the .cc file so that we can hide compiled Thrift
// headers from the public API and also more easily create test fixtures.
std::unique_ptr<Contents> contents_;
+
+ MemoryAllocator* allocator_;
};
@@ -95,7 +98,7 @@ class ParquetFileReader {
// API Convenience to open a serialized Parquet file on disk
static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
- bool memory_map = true);
+ bool memory_map = true, MemoryAllocator* allocator = default_allocator());
void Open(std::unique_ptr<Contents> contents);
void Close();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index c273487..10bcff7 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -28,6 +28,7 @@
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
#include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
using std::string;
@@ -93,7 +94,8 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
// column 0, id
- std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
+ std::shared_ptr<Int32Scanner> scanner(
+ new Int32Scanner(group->Column(0)));
int32_t val;
bool is_null;
for (int i = 0; i < 8; ++i) {
@@ -110,7 +112,8 @@ TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
// column 0, id
- std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
+ std::shared_ptr<Int32Scanner> scanner(
+ new Int32Scanner(group->Column(0)));
ASSERT_EQ(128, scanner->batch_size());
scanner->SetBatchSize(1024);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 5cb7b2f..b4faaa1 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -28,6 +28,7 @@ install(FILES
input.h
logging.h
macros.h
+ mem-allocator.h
mem-pool.h
output.h
rle-encoding.h
@@ -39,6 +40,7 @@ add_library(parquet_util STATIC
buffer.cc
cpu-info.cc
input.cc
+ mem-allocator.cc
mem-pool.cc
output.cc
)
@@ -64,5 +66,6 @@ endif()
ADD_PARQUET_TEST(bit-util-test)
ADD_PARQUET_TEST(buffer-test)
ADD_PARQUET_TEST(input-output-test)
+ADD_PARQUET_TEST(mem-allocator-test)
ADD_PARQUET_TEST(mem-pool-test)
ADD_PARQUET_TEST(rle-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/buffer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-test.cc b/src/parquet/util/buffer-test.cc
index f494326..af6d54c 100644
--- a/src/parquet/util/buffer-test.cc
+++ b/src/parquet/util/buffer-test.cc
@@ -48,8 +48,6 @@ TEST_F(TestBuffer, Resize) {
}
TEST_F(TestBuffer, ResizeOOM) {
- // realloc fails, even though there may be no explicit limit
-
// Tests that deliberately throw Exceptions foul up valgrind and report
// red herring memory leaks
#ifndef PARQUET_VALGRIND
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/buffer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.cc b/src/parquet/util/buffer.cc
index 6647c15..34db8df 100644
--- a/src/parquet/util/buffer.cc
+++ b/src/parquet/util/buffer.cc
@@ -17,9 +17,11 @@
#include "parquet/util/buffer.h"
+#include <algorithm>
#include <cstdint>
#include "parquet/exception.h"
+#include "parquet/types.h"
namespace parquet_cpp {
@@ -34,18 +36,90 @@ std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() {
return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size());
}
-OwnedMutableBuffer::OwnedMutableBuffer() :
- ResizableBuffer(nullptr, 0) {}
+OwnedMutableBuffer::OwnedMutableBuffer(int64_t size, MemoryAllocator* allocator) :
+ ResizableBuffer(nullptr, 0), allocator_(allocator) {
+ Resize(size);
+}
+
+OwnedMutableBuffer::~OwnedMutableBuffer() {
+ if (mutable_data_) {
+ allocator_->Free(mutable_data_, capacity_);
+ }
+}
+
+void OwnedMutableBuffer::Reserve(int64_t new_capacity) {
+ if (!mutable_data_ || new_capacity > capacity_) {
+ if (mutable_data_) {
+ uint8_t* new_data = allocator_->Malloc(new_capacity);
+ memcpy(new_data, mutable_data_, size_);
+ allocator_->Free(mutable_data_, capacity_);
+ mutable_data_ = new_data;
+ } else {
+ mutable_data_ = allocator_->Malloc(new_capacity);
+ }
+ data_ = mutable_data_;
+ capacity_ = new_capacity;
+ }
+}
void OwnedMutableBuffer::Resize(int64_t new_size) {
- try {
- buffer_owner_.resize(new_size);
- } catch (const std::bad_alloc& e) {
- throw ParquetException("OOM: resize failed");
+ Reserve(new_size);
+ size_ = new_size;
+}
+
+uint8_t& OwnedMutableBuffer::operator[](int64_t i) {
+ return mutable_data_[i];
+}
+
+template <class T>
+Vector<T>::Vector(int64_t size, MemoryAllocator* allocator) :
+ buffer_(new OwnedMutableBuffer(size * sizeof(T), allocator)),
+ size_(size), capacity_(size) {
+ if (size > 0) {
+ data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+ } else {
+ data_ = nullptr;
}
+}
+
+template <class T>
+void Vector<T>::Reserve(int64_t new_capacity) {
+ if (new_capacity > capacity_) {
+ buffer_->Resize(new_capacity * sizeof(T));
+ data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+ capacity_ = new_capacity;
+ }
+}
+
+template <class T>
+void Vector<T>::Resize(int64_t new_size) {
+ Reserve(new_size);
size_ = new_size;
- data_ = buffer_owner_.data();
- mutable_data_ = buffer_owner_.data();
}
+template <class T>
+void Vector<T>::Assign(int64_t size, const T val) {
+ Resize(size);
+ for (int64_t i = 0; i < size_; i++) {
+ data_[i] = val;
+ }
+}
+
+template <class T>
+void Vector<T>::Swap(Vector<T>& v) {
+ buffer_.swap(v.buffer_);
+ std::swap(size_, v.size_);
+ std::swap(capacity_, v.capacity_);
+ std::swap(data_, v.data_);
+}
+
+template class Vector<int32_t>;
+template class Vector<int64_t>;
+template class Vector<bool>;
+template class Vector<float>;
+template class Vector<double>;
+template class Vector<Int96>;
+template class Vector<ByteArray>;
+template class Vector<FixedLenByteArray>;
+
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/buffer.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.h b/src/parquet/util/buffer.h
index 8be2b17..7ea2122 100644
--- a/src/parquet/util/buffer.h
+++ b/src/parquet/util/buffer.h
@@ -25,6 +25,7 @@
#include <vector>
#include "parquet/util/macros.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet_cpp {
@@ -119,7 +120,8 @@ class ResizableBuffer : public MutableBuffer {
protected:
ResizableBuffer(uint8_t* data, int64_t size) :
- MutableBuffer(data, size) {}
+ MutableBuffer(data, size), capacity_(size) {}
+ int64_t capacity_;
};
// A ResizableBuffer whose memory is owned by the class instance. For example,
@@ -127,12 +129,39 @@ class ResizableBuffer : public MutableBuffer {
// garbage-collected
class OwnedMutableBuffer : public ResizableBuffer {
public:
- OwnedMutableBuffer();
- virtual void Resize(int64_t new_size);
+ explicit OwnedMutableBuffer(int64_t size = 0,
+ MemoryAllocator* allocator = default_allocator());
+ virtual ~OwnedMutableBuffer();
+ void Resize(int64_t new_size) override;
+ void Reserve(int64_t new_capacity);
+ uint8_t& operator[](int64_t i);
private:
// TODO: aligned allocations
- std::vector<uint8_t> buffer_owner_;
+ MemoryAllocator* allocator_;
+
+ DISALLOW_COPY_AND_ASSIGN(OwnedMutableBuffer);
+};
+
+template <class T>
+class Vector {
+ public:
+ explicit Vector(int64_t size, MemoryAllocator* allocator);
+ void Resize(int64_t new_size);
+ void Reserve(int64_t new_capacity);
+ void Assign(int64_t size, const T val);
+ void Swap(Vector<T>& v);
+ inline T& operator[](int64_t i) {
+ return data_[i];
+ }
+
+ private:
+ std::unique_ptr<OwnedMutableBuffer> buffer_;
+ int64_t size_;
+ int64_t capacity_;
+ T* data_;
+
+ DISALLOW_COPY_AND_ASSIGN(Vector);
};
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
index 424be3a..b5c412c 100644
--- a/src/parquet/util/input-output-test.cc
+++ b/src/parquet/util/input-output-test.cc
@@ -28,6 +28,7 @@
#include "parquet/exception.h"
#include "parquet/util/buffer.h"
#include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
#include "parquet/util/output.h"
#include "parquet/util/test-common.h"
@@ -118,7 +119,6 @@ TYPED_TEST(TestFileReaders, FileDisappeared) {
TYPED_TEST(TestFileReaders, BadSeek) {
this->source.Open(this->test_path_);
-
ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
index 897d81c..0c52c16 100644
--- a/src/parquet/util/input.cc
+++ b/src/parquet/util/input.cc
@@ -35,6 +35,10 @@ std::shared_ptr<Buffer> RandomAccessSource::ReadAt(int64_t pos, int64_t nbytes)
return Read(nbytes);
}
+int64_t RandomAccessSource::Size() const {
+ return size_;
+}
+
// ----------------------------------------------------------------------
// LocalFileSource
@@ -86,10 +90,6 @@ void LocalFileSource::Seek(int64_t pos) {
SeekFile(pos);
}
-int64_t LocalFileSource::Size() const {
- return size_;
-}
-
int64_t LocalFileSource::Tell() const {
int64_t position = ftell(file_);
if (position < 0) {
@@ -107,7 +107,7 @@ int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
}
std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
- auto result = std::make_shared<OwnedMutableBuffer>();
+ auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
result->Resize(nbytes);
int64_t bytes_read = Read(nbytes, result->mutable_data());
@@ -198,10 +198,6 @@ void BufferReader::Seek(int64_t pos) {
pos_ = pos;
}
-int64_t BufferReader::Size() const {
- return size_;
-}
-
int64_t BufferReader::Read(int64_t nbytes, uint8_t* out) {
ParquetException::NYI("not implemented");
return 0;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
index 80fb730..cf015ee 100644
--- a/src/parquet/util/input.h
+++ b/src/parquet/util/input.h
@@ -18,6 +18,8 @@
#ifndef PARQUET_UTIL_INPUT_H
#define PARQUET_UTIL_INPUT_H
+#include <parquet/util/mem-allocator.h>
+
#include <cstdint>
#include <cstdio>
#include <memory>
@@ -36,11 +38,10 @@ class RandomAccessSource {
public:
virtual ~RandomAccessSource() {}
- virtual int64_t Size() const = 0;
-
virtual void Close() = 0;
virtual int64_t Tell() const = 0;
virtual void Seek(int64_t pos) = 0;
+ int64_t Size() const;
// Returns actual number of bytes read
virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
@@ -55,13 +56,14 @@ class RandomAccessSource {
class LocalFileSource : public RandomAccessSource {
public:
- LocalFileSource() : file_(nullptr), is_open_(false) {}
+ explicit LocalFileSource(MemoryAllocator* allocator = default_allocator()) :
+ file_(nullptr), is_open_(false), allocator_(allocator) {}
+
virtual ~LocalFileSource();
virtual void Open(const std::string& path);
virtual void Close();
- virtual int64_t Size() const;
virtual int64_t Tell() const;
virtual void Seek(int64_t pos);
@@ -83,14 +85,13 @@ class LocalFileSource : public RandomAccessSource {
std::string path_;
FILE* file_;
bool is_open_;
+ MemoryAllocator* allocator_;
};
class MemoryMapSource : public LocalFileSource {
public:
- MemoryMapSource() :
- LocalFileSource(),
- data_(nullptr),
- pos_(0) {}
+ explicit MemoryMapSource(MemoryAllocator* allocator = default_allocator()) :
+ LocalFileSource(allocator), data_(nullptr), pos_(0) {}
virtual ~MemoryMapSource();
@@ -123,7 +124,6 @@ class BufferReader : public RandomAccessSource {
virtual void Close() {}
virtual int64_t Tell() const;
virtual void Seek(int64_t pos);
- virtual int64_t Size() const;
virtual int64_t Read(int64_t nbytes, uint8_t* out);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-allocator-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator-test.cc b/src/parquet/util/mem-allocator-test.cc
new file mode 100644
index 0000000..2101176
--- /dev/null
+++ b/src/parquet/util/mem-allocator-test.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "parquet/exception.h"
+#include "parquet/util/mem-allocator.h"
+
+namespace parquet_cpp {
+
+TEST(TestAllocator, AllocateFree) {
+ TrackingAllocator allocator;
+
+ uint8_t* data = allocator.Malloc(100);
+ ASSERT_TRUE(nullptr != data);
+ data[99] = 55;
+ allocator.Free(data, 100);
+
+ data = allocator.Malloc(0);
+ ASSERT_EQ(nullptr, data);
+ allocator.Free(data, 0);
+
+ data = allocator.Malloc(1);
+ ASSERT_THROW(allocator.Free(data, 2), ParquetException);
+ ASSERT_NO_THROW(allocator.Free(data, 1));
+
+ int64_t to_alloc = std::numeric_limits<int64_t>::max();
+ ASSERT_THROW(allocator.Malloc(to_alloc), ParquetException);
+}
+
+TEST(TestAllocator, TotalMax) {
+ TrackingAllocator allocator;
+ ASSERT_EQ(0, allocator.TotalMemory());
+ ASSERT_EQ(0, allocator.MaxMemory());
+
+ uint8_t* data = allocator.Malloc(100);
+ ASSERT_EQ(100, allocator.TotalMemory());
+ ASSERT_EQ(100, allocator.MaxMemory());
+
+ uint8_t* data2 = allocator.Malloc(10);
+ ASSERT_EQ(110, allocator.TotalMemory());
+ ASSERT_EQ(110, allocator.MaxMemory());
+
+ allocator.Free(data, 100);
+ ASSERT_EQ(10, allocator.TotalMemory());
+ ASSERT_EQ(110, allocator.MaxMemory());
+
+ allocator.Free(data2, 10);
+ ASSERT_EQ(0, allocator.TotalMemory());
+ ASSERT_EQ(110, allocator.MaxMemory());
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-allocator.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.cc b/src/parquet/util/mem-allocator.cc
new file mode 100644
index 0000000..2bffff9
--- /dev/null
+++ b/src/parquet/util/mem-allocator.cc
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/mem-allocator.h"
+
+#include <cstdlib>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+MemoryAllocator::~MemoryAllocator() {}
+
+uint8_t* TrackingAllocator::Malloc(int64_t size) {
+ if (0 == size) {
+ return nullptr;
+ }
+
+ uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
+ if (!p) {
+ throw ParquetException("OOM: memory allocation failed");
+ }
+ total_memory_ += size;
+ if (total_memory_ > max_memory_) {
+ max_memory_ = total_memory_;
+ }
+ return p;
+}
+
+void TrackingAllocator::Free(uint8_t* p, int64_t size) {
+ if (nullptr != p && size > 0) {
+ if (total_memory_ < size) {
+ throw ParquetException("Attempting to free too much memory");
+ }
+ total_memory_ -= size;
+ std::free(p);
+ }
+}
+
+TrackingAllocator::~TrackingAllocator() {}
+
+MemoryAllocator* default_allocator() {
+ static TrackingAllocator default_allocator;
+ return &default_allocator;
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-allocator.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.h b/src/parquet/util/mem-allocator.h
new file mode 100644
index 0000000..076a8e0
--- /dev/null
+++ b/src/parquet/util/mem-allocator.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_MEMORY_POOL_H
+#define PARQUET_UTIL_MEMORY_POOL_H
+
+#include "parquet/util/logging.h"
+#include "parquet/util/bit-util.h"
+
+namespace parquet_cpp {
+
+class MemoryAllocator {
+ public:
+ virtual ~MemoryAllocator();
+
+ // Returns nullptr if size is 0
+ virtual uint8_t* Malloc(int64_t size) = 0;
+ virtual void Free(uint8_t* p, int64_t size) = 0;
+};
+
+MemoryAllocator* default_allocator();
+
+class TrackingAllocator: public MemoryAllocator {
+ public:
+ TrackingAllocator() : total_memory_(0), max_memory_(0) {}
+ virtual ~TrackingAllocator();
+
+ uint8_t* Malloc(int64_t size) override;
+ void Free(uint8_t* p, int64_t size) override;
+
+ int64_t TotalMemory() {
+ return total_memory_;
+ }
+
+ int64_t MaxMemory() {
+ return max_memory_;
+ }
+
+ private:
+ int64_t total_memory_;
+ int64_t max_memory_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_MEMORY_POOL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-pool.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc
index f8626bc..2f78a18 100644
--- a/src/parquet/util/mem-pool.cc
+++ b/src/parquet/util/mem-pool.cc
@@ -34,12 +34,13 @@ namespace parquet_cpp {
const int MemPool::INITIAL_CHUNK_SIZE;
const int MemPool::MAX_CHUNK_SIZE;
-MemPool::MemPool()
+MemPool::MemPool(MemoryAllocator* allocator)
: current_chunk_idx_(-1),
next_chunk_size_(INITIAL_CHUNK_SIZE),
total_allocated_bytes_(0),
peak_allocated_bytes_(0),
- total_reserved_bytes_(0) {}
+ total_reserved_bytes_(0),
+ allocator_(allocator) {}
MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
: data(buf),
@@ -51,7 +52,7 @@ MemPool::~MemPool() {
int64_t total_bytes_released = 0;
for (size_t i = 0; i < chunks_.size(); ++i) {
total_bytes_released += chunks_[i].size;
- free(chunks_[i].data);
+ allocator_->Free(chunks_[i].data, chunks_[i].size);
}
DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
@@ -70,7 +71,7 @@ void MemPool::FreeAll() {
int64_t total_bytes_released = 0;
for (size_t i = 0; i < chunks_.size(); ++i) {
total_bytes_released += chunks_[i].size;
- free(chunks_[i].data);
+ allocator_->Free(chunks_[i].data, chunks_[i].size);
}
chunks_.clear();
next_chunk_size_ = INITIAL_CHUNK_SIZE;
@@ -108,7 +109,7 @@ bool MemPool::FindChunk(int64_t min_size) {
chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
// Allocate a new chunk. Return early if malloc fails.
- uint8_t* buf = reinterpret_cast<uint8_t*>(malloc(chunk_size));
+ uint8_t* buf = allocator_->Malloc(chunk_size);
if (UNLIKELY(buf == NULL)) {
DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-pool.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h
index 3f21aa7..c81712b 100644
--- a/src/parquet/util/mem-pool.h
+++ b/src/parquet/util/mem-pool.h
@@ -29,11 +29,12 @@
#include "parquet/util/logging.h"
#include "parquet/util/bit-util.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet_cpp {
-/// A MemPool maintains a list of memory chunks from which it allocates memory in
-/// response to Allocate() calls;
+/// A MemPool maintains a list of memory chunks from which it allocates memory
+/// in response to Allocate() calls;
/// Chunks stay around for the lifetime of the mempool or until they are passed on to
/// another mempool.
//
@@ -75,7 +76,7 @@ namespace parquet_cpp {
class MemPool {
public:
- MemPool();
+ explicit MemPool(MemoryAllocator* allocator = default_allocator());
/// Frees all chunks of memory and subtracts the total allocated bytes
/// from the registered limits.
@@ -165,6 +166,8 @@ class MemPool {
std::vector<ChunkInfo> chunks_;
+ MemoryAllocator* allocator_;
+
/// Find or allocated a chunk with at least min_size spare capacity and update
/// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
/// if a new chunk needs to be created.
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
index b28421e..fa641fe 100644
--- a/src/parquet/util/output.cc
+++ b/src/parquet/util/output.cc
@@ -28,21 +28,14 @@ namespace parquet_cpp {
// ----------------------------------------------------------------------
// In-memory output stream
-static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024;
-
-InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) :
- size_(0),
- capacity_(initial_capacity) {
+InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity,
+ MemoryAllocator* allocator) : size_(0), capacity_(initial_capacity) {
if (initial_capacity == 0) {
initial_capacity = IN_MEMORY_DEFAULT_CAPACITY;
}
- buffer_.reset(new OwnedMutableBuffer());
- buffer_->Resize(initial_capacity);
+ buffer_.reset(new OwnedMutableBuffer(initial_capacity, allocator));
}
-InMemoryOutputStream::InMemoryOutputStream() :
- InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {}
-
uint8_t* InMemoryOutputStream::Head() {
return buffer_->mutable_data() + size_;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index b466e0e..bc320a4 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -22,6 +22,7 @@
#include <memory>
#include "parquet/util/macros.h"
+#include "parquet/util/mem-allocator.h"
namespace parquet_cpp {
@@ -44,12 +45,13 @@ class OutputStream {
virtual void Write(const uint8_t* data, int64_t length) = 0;
};
+static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024;
// An output stream that is an in-memory
class InMemoryOutputStream : public OutputStream {
public:
- InMemoryOutputStream();
- explicit InMemoryOutputStream(int64_t initial_capacity);
+ explicit InMemoryOutputStream(int64_t initial_capacity = IN_MEMORY_DEFAULT_CAPACITY,
+ MemoryAllocator* allocator = default_allocator());
// Close is currently a no-op with the in-memory stream
virtual void Close() {}