You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/30 16:36:37 UTC
[3/4] parquet-cpp git commit: PARQUET-818: Refactoring to utilize
common IO, buffer, memory management abstractions and implementations
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index 184c74d..13fb01b 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -29,7 +29,7 @@
#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
namespace parquet {
@@ -44,7 +44,7 @@ class PARQUET_EXPORT Scanner {
: batch_size_(batch_size),
level_offset_(0),
levels_buffered_(0),
- value_buffer_(0, allocator),
+ value_buffer_(std::make_shared<PoolBuffer>(allocator)),
value_offset_(0),
values_buffered_(0),
reader_(reader) {
@@ -76,7 +76,7 @@ class PARQUET_EXPORT Scanner {
int level_offset_;
int levels_buffered_;
- OwnedMutableBuffer value_buffer_;
+ std::shared_ptr<PoolBuffer> value_buffer_;
int value_offset_;
int64_t values_buffered_;
@@ -95,8 +95,8 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
: Scanner(reader, batch_size, allocator) {
typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
int value_byte_size = type_traits<DType::type_num>::value_byte_size;
- value_buffer_.Resize(batch_size_ * value_byte_size);
- values_ = reinterpret_cast<T*>(&value_buffer_[0]);
+ PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
+ values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
}
virtual ~TypedScanner() {}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
index 364d9d4..c8641a1 100644
--- a/src/parquet/column/statistics-test.cc
+++ b/src/parquet/column/statistics-test.cc
@@ -33,9 +33,7 @@
#include "parquet/file/writer.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -150,8 +148,8 @@ class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
file_writer->Close();
auto buffer = sink->GetBuffer();
- std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
- auto file_reader = ParquetFileReader::Open(std::move(source));
+ auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+ auto file_reader = ParquetFileReader::Open(source);
auto rg_reader = file_reader->RowGroup(0);
auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
@@ -191,7 +189,8 @@ std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
std::vector<FLBA> copy;
MemoryAllocator* allocator = default_allocator();
for (const FLBA& flba : values) {
- uint8_t* ptr = allocator->Malloc(FLBA_LENGTH);
+ uint8_t* ptr;
+ PARQUET_THROW_NOT_OK(allocator->Allocate(FLBA_LENGTH, &ptr));
memcpy(ptr, flba.ptr, FLBA_LENGTH);
copy.emplace_back(ptr);
}
@@ -204,7 +203,8 @@ std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
std::vector<ByteArray> copy;
MemoryAllocator* allocator = default_allocator();
for (const ByteArray& ba : values) {
- uint8_t* ptr = allocator->Malloc(ba.len);
+ uint8_t* ptr;
+ PARQUET_THROW_NOT_OK(allocator->Allocate(ba.len, &ptr));
memcpy(ptr, ba.ptr, ba.len);
copy.emplace_back(ba.len, ptr);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
index 0330ac1..9b76fab 100644
--- a/src/parquet/column/statistics.cc
+++ b/src/parquet/column/statistics.cc
@@ -21,16 +21,17 @@
#include "parquet/column/statistics.h"
#include "parquet/encodings/plain-encoding.h"
#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
#include "parquet/util/comparison.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
namespace parquet {
template <typename DType>
TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
const ColumnDescriptor* schema, MemoryAllocator* allocator)
- : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) {
+ : allocator_(allocator),
+ min_buffer_(AllocateBuffer(allocator_, 0)),
+ max_buffer_(AllocateBuffer(allocator_, 0)) {
SetDescr(schema);
Reset();
}
@@ -40,14 +41,14 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_
const typename DType::c_type& max, int64_t num_values, int64_t null_count,
int64_t distinct_count)
: allocator_(default_allocator()),
- min_buffer_(0, allocator_),
- max_buffer_(0, allocator_) {
+ min_buffer_(AllocateBuffer(allocator_, 0)),
+ max_buffer_(AllocateBuffer(allocator_, 0)) {
IncrementNumValues(num_values);
IncrementNullCount(null_count);
IncrementDistinctCount(distinct_count);
- Copy(min, &min_, min_buffer_);
- Copy(max, &max_, max_buffer_);
+ Copy(min, &min_, min_buffer_.get());
+ Copy(max, &max_, max_buffer_.get());
has_min_max_ = true;
}
@@ -56,7 +57,9 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor*
const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
int64_t null_count, int64_t distinct_count, bool has_min_max,
MemoryAllocator* allocator)
- : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) {
+ : allocator_(allocator),
+ min_buffer_(AllocateBuffer(allocator_, 0)),
+ max_buffer_(AllocateBuffer(allocator_, 0)) {
IncrementNumValues(num_values);
IncrementNullCount(null_count);
IncrementDistinctCount(distinct_count);
@@ -94,11 +97,11 @@ void TypedRowGroupStatistics<DType>::Update(
auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
if (!has_min_max_) {
has_min_max_ = true;
- Copy(*batch_minmax.first, &min_, min_buffer_);
- Copy(*batch_minmax.second, &max_, max_buffer_);
+ Copy(*batch_minmax.first, &min_, min_buffer_.get());
+ Copy(*batch_minmax.second, &max_, max_buffer_.get());
} else {
- Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_);
- Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_);
+ Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
+ Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
}
}
@@ -119,15 +122,15 @@ void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>&
if (!other.HasMinMax()) return;
if (!has_min_max_) {
- Copy(other.min_, &this->min_, min_buffer_);
- Copy(other.max_, &this->max_, max_buffer_);
+ Copy(other.min_, &this->min_, min_buffer_.get());
+ Copy(other.max_, &this->max_, max_buffer_.get());
has_min_max_ = true;
return;
}
Compare<T> compare(descr_);
- Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_);
- Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_);
+ Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
+ Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
}
template <typename DType>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
index a21a0fa..cf41dc0 100644
--- a/src/parquet/column/statistics.h
+++ b/src/parquet/column/statistics.h
@@ -24,8 +24,7 @@
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
namespace parquet {
@@ -166,34 +165,33 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
void PlainEncode(const T& src, std::string* dst);
void PlainDecode(const std::string& src, T* dst);
- void Copy(const T& src, T* dst, OwnedMutableBuffer& buffer);
+ void Copy(const T& src, T* dst, PoolBuffer* buffer);
- OwnedMutableBuffer min_buffer_, max_buffer_;
+ std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
};
template <typename DType>
-inline void TypedRowGroupStatistics<DType>::Copy(
- const T& src, T* dst, OwnedMutableBuffer&) {
+inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
*dst = src;
}
template <>
inline void TypedRowGroupStatistics<FLBAType>::Copy(
- const FLBA& src, FLBA* dst, OwnedMutableBuffer& buffer) {
+ const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
if (dst->ptr == src.ptr) return;
uint32_t len = descr_->type_length();
- buffer.Resize(len);
- std::memcpy(&buffer[0], src.ptr, len);
- *dst = FLBA(buffer.data());
+ PARQUET_THROW_NOT_OK(buffer->Resize(len));
+ std::memcpy(buffer->mutable_data(), src.ptr, len);
+ *dst = FLBA(buffer->data());
}
template <>
inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
- const ByteArray& src, ByteArray* dst, OwnedMutableBuffer& buffer) {
+ const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
if (dst->ptr == src.ptr) return;
- buffer.Resize(src.len);
- std::memcpy(&buffer[0], src.ptr, src.len);
- *dst = ByteArray(src.len, buffer.data());
+ PARQUET_THROW_NOT_OK(buffer->Resize(src.len));
+ std::memcpy(buffer->mutable_data(), src.ptr, src.len);
+ *dst = ByteArray(src.len, buffer->data());
}
template <>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 10632d2..9efa623 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -28,13 +28,15 @@
#include <string>
#include <vector>
+#include <gtest/gtest.h>
+
#include "parquet/column/levels.h"
#include "parquet/column/page.h"
// Depended on by SerializedPageReader test utilities for now
#include "parquet/encodings/dictionary-encoding.h"
#include "parquet/encodings/plain-encoding.h"
-#include "parquet/util/input.h"
+#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
using std::vector;
@@ -253,8 +255,8 @@ class DictionaryPageBuilder {
}
shared_ptr<Buffer> WriteDict() {
- shared_ptr<OwnedMutableBuffer> dict_buffer =
- std::make_shared<OwnedMutableBuffer>(encoder_->dict_encoded_size());
+ std::shared_ptr<PoolBuffer> dict_buffer =
+ AllocateBuffer(default_allocator(), encoder_->dict_encoded_size());
encoder_->WriteDict(dict_buffer->mutable_data());
return dict_buffer;
}
@@ -262,7 +264,7 @@ class DictionaryPageBuilder {
int32_t num_values() const { return num_dict_values_; }
private:
- MemPool pool_;
+ ChunkedAllocator pool_;
shared_ptr<DictEncoder<TYPE>> encoder_;
int32_t num_dict_values_;
bool have_values_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 6112efe..7319d46 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -21,6 +21,7 @@
#include "parquet/column/statistics.h"
#include "parquet/encodings/dictionary-encoding.h"
#include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -55,8 +56,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
}
void ColumnWriter::InitSinks() {
- definition_levels_sink_.reset(new InMemoryOutputStream());
- repetition_levels_sink_.reset(new InMemoryOutputStream());
+ definition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator()));
+ repetition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator()));
}
void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -77,7 +78,8 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
int64_t rle_size =
LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) +
sizeof(uint32_t);
- auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size, allocator_);
+ std::shared_ptr<PoolBuffer> buffer_rle =
+ AllocateBuffer(properties_->allocator(), rle_size);
level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_,
buffer_rle->mutable_data() + sizeof(uint32_t),
buffer_rle->size() - sizeof(uint32_t));
@@ -87,7 +89,7 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len();
int64_t encoded_size = level_encoder_.len() + sizeof(uint32_t);
DCHECK(rle_size >= encoded_size);
- buffer_rle->Resize(encoded_size);
+ PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size));
return std::static_pointer_cast<Buffer>(buffer_rle);
}
@@ -110,8 +112,8 @@ void ColumnWriter::AddDataPage() {
definition_levels->size() + repetition_levels->size() + values->size();
// Concatenate data into a single buffer
- std::shared_ptr<OwnedMutableBuffer> uncompressed_data =
- std::make_shared<OwnedMutableBuffer>(uncompressed_size, allocator_);
+ std::shared_ptr<PoolBuffer> uncompressed_data =
+ AllocateBuffer(allocator_, uncompressed_size);
uint8_t* uncompressed_ptr = uncompressed_data->mutable_data();
memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size());
uncompressed_ptr += repetition_levels->size();
@@ -223,7 +225,8 @@ void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
template <typename Type>
void TypedColumnWriter<Type>::WriteDictionaryPage() {
auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
- auto buffer = std::make_shared<OwnedMutableBuffer>(dict_encoder->dict_encoded_size());
+ std::shared_ptr<PoolBuffer> buffer =
+ AllocateBuffer(properties_->allocator(), dict_encoder->dict_encoded_size());
dict_encoder->WriteDict(buffer->mutable_data());
// TODO Get rid of this deep call
dict_encoder->mem_pool()->FreeAll();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 67a29bc..39d5934 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -28,9 +28,7 @@
#include "parquet/file/metadata.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/mem-pool.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
namespace parquet {
@@ -111,7 +109,7 @@ class PARQUET_EXPORT ColumnWriter {
LevelEncoder level_encoder_;
MemoryAllocator* allocator_;
- MemPool pool_;
+ ChunkedAllocator pool_;
// The total number of values stored in the data page. This is the maximum of
// the number of encoded definition levels or encoded values. For
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/decoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h
index 4442507..1ac9f35 100644
--- a/src/parquet/encodings/decoder.h
+++ b/src/parquet/encodings/decoder.h
@@ -22,7 +22,7 @@
#include "parquet/exception.h"
#include "parquet/types.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
namespace parquet {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 5353817..59774a4 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -24,7 +24,7 @@
#include "parquet/encodings/decoder.h"
#include "parquet/util/bit-stream-utils.inline.h"
-#include "parquet/util/buffer.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -36,7 +36,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
explicit DeltaBitPackDecoder(
const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
: Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED),
- delta_bit_widths_(0, allocator) {
+ delta_bit_widths_(new PoolBuffer(allocator)) {
if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
@@ -62,28 +62,31 @@ class DeltaBitPackDecoder : public Decoder<DType> {
if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException(); }
if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
- delta_bit_widths_.Resize(num_mini_blocks_);
+ PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_));
+
+ uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
for (int i = 0; i < num_mini_blocks_; ++i) {
- if (!decoder_.GetAligned<uint8_t>(1, &delta_bit_widths_[i])) {
+ if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
ParquetException::EofException();
}
}
values_per_mini_block_ = block_size / num_mini_blocks_;
mini_block_idx_ = 0;
- delta_bit_width_ = delta_bit_widths_[0];
+ delta_bit_width_ = bit_width_data[0];
values_current_mini_block_ = values_per_mini_block_;
}
template <typename T>
int GetInternal(T* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
+ const uint8_t* bit_width_data = delta_bit_widths_->data();
for (int i = 0; i < max_values; ++i) {
if (UNLIKELY(values_current_mini_block_ == 0)) {
++mini_block_idx_;
- if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_.size())) {
- delta_bit_width_ = delta_bit_widths_[mini_block_idx_];
+ if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
+ delta_bit_width_ = bit_width_data[mini_block_idx_];
values_current_mini_block_ = values_per_mini_block_;
} else {
InitBlock();
@@ -112,7 +115,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
int32_t min_delta_;
size_t mini_block_idx_;
- OwnedMutableBuffer delta_bit_widths_;
+ std::unique_ptr<PoolBuffer> delta_bit_widths_;
int delta_bit_width_;
int32_t last_value_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 7823307..b79744a 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -27,11 +27,9 @@
#include "parquet/encodings/decoder.h"
#include "parquet/encodings/encoder.h"
#include "parquet/encodings/plain-encoding.h"
-#include "parquet/util/buffer.h"
#include "parquet/util/cpu-info.h"
#include "parquet/util/hash-util.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/mem-pool.h"
+#include "parquet/util/memory.h"
#include "parquet/util/rle-encoding.h"
namespace parquet {
@@ -48,7 +46,7 @@ class DictionaryDecoder : public Decoder<Type> {
const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
: Decoder<Type>(descr, Encoding::RLE_DICTIONARY),
dictionary_(0, allocator),
- byte_array_data_(0, allocator) {}
+ byte_array_data_(AllocateBuffer(allocator, 0)) {}
// Perform type-specific initiatialization
void SetDict(Decoder<Type>* dictionary);
@@ -78,7 +76,7 @@ class DictionaryDecoder : public Decoder<Type> {
// Data that contains the byte array data (byte_array_dictionary_ just has the
// pointers).
- OwnedMutableBuffer byte_array_data_;
+ std::shared_ptr<PoolBuffer> byte_array_data_;
RleDecoder idx_decoder_;
};
@@ -106,11 +104,13 @@ inline void DictionaryDecoder<ByteArrayType>::SetDict(
for (int i = 0; i < num_dictionary_values; ++i) {
total_size += dictionary_[i].len;
}
- byte_array_data_.Resize(total_size);
+ PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size));
int offset = 0;
+
+ uint8_t* bytes_data = byte_array_data_->mutable_data();
for (int i = 0; i < num_dictionary_values; ++i) {
- memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len);
- dictionary_[i].ptr = &byte_array_data_[offset];
+ memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len);
+ dictionary_[i].ptr = bytes_data + offset;
offset += dictionary_[i].len;
}
}
@@ -124,11 +124,12 @@ inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary)
int fixed_len = descr_->type_length();
int total_size = num_dictionary_values * fixed_len;
- byte_array_data_.Resize(total_size);
+ PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size));
+ uint8_t* bytes_data = byte_array_data_->mutable_data();
int offset = 0;
for (int i = 0; i < num_dictionary_values; ++i) {
- memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
- dictionary_[i].ptr = &byte_array_data_[offset];
+ memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len);
+ dictionary_[i].ptr = bytes_data + offset;
offset += fixed_len;
}
}
@@ -158,7 +159,7 @@ class DictEncoder : public Encoder<DType> {
public:
typedef typename DType::c_type T;
- explicit DictEncoder(const ColumnDescriptor* desc, MemPool* pool = nullptr,
+ explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr,
MemoryAllocator* allocator = default_allocator())
: Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
allocator_(allocator),
@@ -176,7 +177,7 @@ class DictEncoder : public Encoder<DType> {
// TODO(wesm): think about how to address the construction semantics in
// encodings/dictionary-encoding.h
- void set_mem_pool(MemPool* pool) { pool_ = pool; }
+ void set_mem_pool(ChunkedAllocator* pool) { pool_ = pool; }
void set_type_length(int type_length) { type_length_ = type_length; }
@@ -215,11 +216,11 @@ class DictEncoder : public Encoder<DType> {
void Put(const T& value);
std::shared_ptr<Buffer> FlushValues() override {
- auto buffer = std::make_shared<OwnedMutableBuffer>(
- EstimatedDataEncodedSize(), this->allocator_);
+ std::shared_ptr<PoolBuffer> buffer =
+ AllocateBuffer(this->allocator_, EstimatedDataEncodedSize());
int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize());
ClearIndices();
- buffer->Resize(result_size);
+ PARQUET_THROW_NOT_OK(buffer->Resize(result_size));
return buffer;
};
@@ -233,7 +234,7 @@ class DictEncoder : public Encoder<DType> {
/// dict_encoded_size() bytes.
void WriteDict(uint8_t* buffer);
- MemPool* mem_pool() { return pool_; }
+ ChunkedAllocator* mem_pool() { return pool_; }
/// The number of entries in the dictionary.
int num_entries() const { return uniques_.size(); }
@@ -242,7 +243,7 @@ class DictEncoder : public Encoder<DType> {
MemoryAllocator* allocator_;
// For ByteArray / FixedLenByteArray data. Not owned
- MemPool* pool_;
+ ChunkedAllocator* pool_;
/// Size of the table. Must be a power of 2.
int hash_table_size_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index a325ab5..c51f8d5 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -23,12 +23,11 @@
#include "parquet/exception.h"
#include "parquet/types.h"
+#include "parquet/util/memory.h"
namespace parquet {
-class Buffer;
class ColumnDescriptor;
-class OutputStream;
// Base class for value encoders. Since encoders may or not have state (e.g.,
// dictionary encoding) we use a class instance to maintain any state.
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc
index e62d758..516e453 100644
--- a/src/parquet/encodings/encoding-benchmark.cc
+++ b/src/parquet/encodings/encoding-benchmark.cc
@@ -19,7 +19,7 @@
#include "parquet/encodings/dictionary-encoding.h"
#include "parquet/file/reader-internal.h"
-#include "parquet/util/mem-pool.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -101,23 +101,25 @@ static void DecodeDict(
typedef typename Type::c_type T;
int num_values = values.size();
- MemPool pool;
+ ChunkedAllocator pool;
MemoryAllocator* allocator = default_allocator();
std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);
- std::shared_ptr<OwnedMutableBuffer> dict_buffer =
- std::make_shared<OwnedMutableBuffer>();
- auto indices = std::make_shared<OwnedMutableBuffer>();
DictEncoder<Type> encoder(descr.get(), &pool, allocator);
for (int i = 0; i < num_values; ++i) {
encoder.Put(values[i]);
}
- dict_buffer->Resize(encoder.dict_encoded_size());
+ std::shared_ptr<PoolBuffer> dict_buffer =
+ AllocateBuffer(allocator, encoder.dict_encoded_size());
+
+ std::shared_ptr<PoolBuffer> indices =
+ AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize());
+
encoder.WriteDict(dict_buffer->mutable_data());
- indices->Resize(encoder.EstimatedDataEncodedSize());
int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size());
- indices->Resize(actual_bytes);
+
+ PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes));
while (state.KeepRunning()) {
PlainDecoder<Type> dict_decoder(descr.get());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index daa25cb..eccfc5d 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -28,8 +28,7 @@
#include "parquet/schema/types.h"
#include "parquet/types.h"
#include "parquet/util/bit-util.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
using std::string;
@@ -178,7 +177,7 @@ class TestEncodingBase : public ::testing::Test {
}
protected:
- MemPool pool_;
+ ChunkedAllocator pool_;
MemoryAllocator* allocator_;
int num_values_;
@@ -250,10 +249,9 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
void CheckRoundtrip() {
DictEncoder<Type> encoder(descr_.get(), &pool_);
- dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
-
ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
- dict_buffer_->Resize(encoder.dict_encoded_size());
+ dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size());
+
encoder.WriteDict(dict_buffer_->mutable_data());
std::shared_ptr<Buffer> indices = encoder.FlushValues();
@@ -277,7 +275,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
protected:
USING_BASE_MEMBERS();
- std::shared_ptr<OwnedMutableBuffer> dict_buffer_;
+ std::shared_ptr<PoolBuffer> dict_buffer_;
};
TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index a3d7b69..d2127ef 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -25,8 +25,7 @@
#include "parquet/encodings/encoder.h"
#include "parquet/schema/descriptor.h"
#include "parquet/util/bit-stream-utils.inline.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -163,8 +162,9 @@ class PlainEncoder : public Encoder<DType> {
explicit PlainEncoder(
const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
- : Encoder<DType>(descr, Encoding::PLAIN, allocator),
- values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {}
+ : Encoder<DType>(descr, Encoding::PLAIN, allocator) {
+ values_sink_.reset(new InMemoryOutputStream(allocator));
+ }
int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
@@ -172,7 +172,7 @@ class PlainEncoder : public Encoder<DType> {
void Put(const T* src, int num_values) override;
protected:
- std::shared_ptr<InMemoryOutputStream> values_sink_;
+ std::unique_ptr<InMemoryOutputStream> values_sink_;
};
template <>
@@ -181,10 +181,10 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
explicit PlainEncoder(
const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
: Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
- bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8),
- bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator),
- values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {
- bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size()));
+ bits_available_(kInMemoryDefaultCapacity * 8),
+ bits_buffer_(AllocateBuffer(allocator, kInMemoryDefaultCapacity)),
+ values_sink_(new InMemoryOutputStream(allocator)) {
+ bit_writer_.reset(new BitWriter(bits_buffer_->mutable_data(), bits_buffer_->size()));
}
int64_t EstimatedDataEncodedSize() override {
@@ -196,12 +196,11 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
bit_writer_->Flush();
values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
bit_writer_->Clear();
- bits_available_ = bits_buffer_.size() * 8;
+ bits_available_ = bits_buffer_->size() * 8;
}
std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
- values_sink_.reset(
- new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
+ values_sink_.reset(new InMemoryOutputStream(this->allocator_));
return buffer;
}
@@ -225,7 +224,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
\
int bits_remaining = num_values - bit_offset; \
while (bit_offset < num_values) { \
- bits_available_ = bits_buffer_.size() * 8; \
+ bits_available_ = bits_buffer_->size() * 8; \
\
int bits_to_write = std::min(bits_available_, bits_remaining); \
for (int i = bit_offset; i < bit_offset + bits_to_write; i++) { \
@@ -249,15 +248,14 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
protected:
int bits_available_;
std::unique_ptr<BitWriter> bit_writer_;
- OwnedMutableBuffer bits_buffer_;
- std::shared_ptr<InMemoryOutputStream> values_sink_;
+ std::shared_ptr<PoolBuffer> bits_buffer_;
+ std::unique_ptr<InMemoryOutputStream> values_sink_;
};
template <typename DType>
inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() {
std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
- values_sink_.reset(
- new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
+ values_sink_.reset(new InMemoryOutputStream(this->allocator_));
return buffer;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 5287885..fbb511a 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -33,12 +33,13 @@
#include "parquet/thrift/parquet_types.h"
#include "parquet/thrift/util.h"
#include "parquet/types.h"
-#include "parquet/util/input.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
namespace parquet {
+using ::arrow::io::BufferReader;
+
// Adds page statistics occupying a certain amount of bytes (for testing very
// large page headers)
static inline void AddDummyStats(int stat_size, format::DataPageHeader& data_page) {
@@ -234,11 +235,13 @@ TEST_F(TestPageSerde, LZONotSupported) {
class TestParquetFileReader : public ::testing::Test {
public:
void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) {
- std::unique_ptr<BufferReader> reader(new BufferReader(buffer));
reader_.reset(new ParquetFileReader());
+ auto reader = std::make_shared<BufferReader>(buffer);
+ auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader));
+
ASSERT_THROW(
- reader_->Open(SerializedFile::Open(std::move(reader))), ParquetException);
+ reader_->Open(SerializedFile::Open(std::move(wrapper))), ParquetException);
}
protected:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 3a11cd8..7a90eeb 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -24,8 +24,7 @@
#include "parquet/file/reader.h"
#include "parquet/file/writer.h"
#include "parquet/types.h"
-#include "parquet/util/input.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -75,8 +74,9 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
file_writer->Close();
auto buffer = sink->GetBuffer();
- std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
- auto file_reader = ParquetFileReader::Open(std::move(source));
+
+ auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+ auto file_reader = ParquetFileReader::Open(source);
ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
ASSERT_EQ(100, file_reader->metadata()->num_rows());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index adfcb69..692a0f5 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -23,6 +23,7 @@
#include "parquet/file/metadata.h"
#include "parquet/schema/converter.h"
#include "parquet/thrift/util.h"
+#include "parquet/util/memory.h"
#include <boost/algorithm/string.hpp>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index c5dd03a..ef19c98 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -27,7 +27,7 @@
#include "parquet/compression/codec.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
namespace parquet {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 37c790c..2c3ebb3 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -32,8 +32,7 @@
#include "parquet/schema/types.h"
#include "parquet/thrift/util.h"
#include "parquet/types.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/input.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -44,7 +43,7 @@ namespace parquet {
SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
int64_t total_num_rows, Compression::type codec_type, MemoryAllocator* allocator)
: stream_(std::move(stream)),
- decompression_buffer_(0, allocator),
+ decompression_buffer_(AllocateBuffer(allocator, 0)),
seen_num_rows_(0),
total_num_rows_(total_num_rows) {
max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
@@ -97,12 +96,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
// Uncompress it if we need to
if (decompressor_ != NULL) {
// Grow the uncompressed buffer if we need to.
- if (uncompressed_len > static_cast<int>(decompression_buffer_.size())) {
- decompression_buffer_.Resize(uncompressed_len);
+ if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
+ PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len));
}
- decompressor_->Decompress(
- compressed_len, buffer, uncompressed_len, &decompression_buffer_[0]);
- buffer = &decompression_buffer_[0];
+ decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+ decompression_buffer_->mutable_data());
+ buffer = decompression_buffer_->data();
}
auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
@@ -207,7 +206,7 @@ static constexpr uint32_t FOOTER_SIZE = 8;
static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
- std::unique_ptr<RandomAccessSource> source, ReaderProperties props) {
+ std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) {
std::unique_ptr<ParquetFileReader::Contents> result(
new SerializedFile(std::move(source), props));
@@ -239,39 +238,40 @@ const FileMetaData* SerializedFile::metadata() const {
}
SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source,
- ReaderProperties props = default_reader_properties())
+ const ReaderProperties& props = default_reader_properties())
: source_(std::move(source)), properties_(props) {}
void SerializedFile::ParseMetaData() {
- int64_t filesize = source_->Size();
+ int64_t file_size = source_->Size();
- if (filesize < FOOTER_SIZE) {
+ if (file_size < FOOTER_SIZE) {
throw ParquetException("Corrupted file, smaller than file footer");
}
uint8_t footer_buffer[FOOTER_SIZE];
- source_->Seek(filesize - FOOTER_SIZE);
+ source_->Seek(file_size - FOOTER_SIZE);
int64_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer);
if (bytes_read != FOOTER_SIZE || memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
throw ParquetException("Invalid parquet file. Corrupt footer.");
}
uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
- int64_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
- if (FOOTER_SIZE + metadata_len > filesize) {
+ int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
+ if (FOOTER_SIZE + metadata_len > file_size) {
throw ParquetException(
"Invalid parquet file. File is less than "
"file metadata size.");
}
source_->Seek(metadata_start);
- OwnedMutableBuffer metadata_buffer(metadata_len, properties_.allocator());
- bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
+ std::shared_ptr<PoolBuffer> metadata_buffer =
+ AllocateBuffer(properties_.allocator(), metadata_len);
+ bytes_read = source_->Read(metadata_len, metadata_buffer->mutable_data());
if (bytes_read != metadata_len) {
throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
}
- file_metadata_ = FileMetaData::Make(&metadata_buffer[0], &metadata_len);
+ file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
}
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 582ab35..aa9b75e 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -29,7 +29,7 @@
#include "parquet/file/reader.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/types.h"
-#include "parquet/util/input.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -62,7 +62,7 @@ class SerializedPageReader : public PageReader {
// Compression codec to use.
std::unique_ptr<Codec> decompressor_;
- OwnedMutableBuffer decompression_buffer_;
+ std::shared_ptr<PoolBuffer> decompression_buffer_;
// Maximum allowed page size
uint32_t max_page_header_size_;
@@ -104,7 +104,7 @@ class SerializedFile : public ParquetFileReader::Contents {
// lifetime separately
static std::unique_ptr<ParquetFileReader::Contents> Open(
std::unique_ptr<RandomAccessSource> source,
- ReaderProperties props = default_reader_properties());
+ const ReaderProperties& props = default_reader_properties());
virtual void Close();
virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
virtual const FileMetaData* metadata() const;
@@ -113,7 +113,7 @@ class SerializedFile : public ParquetFileReader::Contents {
private:
// This class takes ownership of the provided data source
explicit SerializedFile(
- std::unique_ptr<RandomAccessSource> source, ReaderProperties props);
+ std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props);
std::unique_ptr<RandomAccessSource> source_;
std::unique_ptr<FileMetaData> file_metadata_;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 06d2d8e..52fe57a 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -24,14 +24,16 @@
#include <utility>
#include <vector>
+#include "arrow/io/file.h"
+
#include "parquet/column/page.h"
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
#include "parquet/exception.h"
#include "parquet/file/reader-internal.h"
#include "parquet/types.h"
-#include "parquet/util/input.h"
#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
using std::string;
using std::vector;
@@ -69,26 +71,36 @@ ParquetFileReader::~ParquetFileReader() {
}
std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
- std::unique_ptr<RandomAccessSource> source, ReaderProperties props) {
- auto contents = SerializedFile::Open(std::move(source), props);
+ const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
+ const ReaderProperties& props) {
+ std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
+ return Open(std::move(io_wrapper), props);
+}
+std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
+ std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) {
+ auto contents = SerializedFile::Open(std::move(source), props);
std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
result->Open(std::move(contents));
-
return result;
}
std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
- const std::string& path, bool memory_map, ReaderProperties props) {
- std::unique_ptr<LocalFileSource> file;
+ const std::string& path, bool memory_map, const ReaderProperties& props) {
+ std::shared_ptr<::arrow::io::ReadableFileInterface> source;
if (memory_map) {
- file.reset(new MemoryMapSource(props.allocator()));
+ std::shared_ptr<::arrow::io::ReadableFile> handle;
+ PARQUET_THROW_NOT_OK(
+ ::arrow::io::ReadableFile::Open(path, props.allocator(), &handle));
+ source = handle;
} else {
- file.reset(new LocalFileSource(props.allocator()));
+ std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
+ PARQUET_THROW_NOT_OK(
+ ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
+ source = handle;
}
- file->Open(path);
- return Open(std::move(file), props);
+ return Open(source, props);
}
void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index ca28f67..1c24506 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -30,12 +30,12 @@
#include "parquet/column/statistics.h"
#include "parquet/file/metadata.h"
#include "parquet/schema/descriptor.h"
+#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
namespace parquet {
class ColumnReader;
-class RandomAccessSource;
class PARQUET_EXPORT RowGroupReader {
public:
@@ -79,15 +79,27 @@ class PARQUET_EXPORT ParquetFileReader {
ParquetFileReader();
~ParquetFileReader();
+ // Create a reader from some implementation of parquet-cpp's generic file
+ // input interface
+ //
+ // If you cannot provide exclusive access to your file resource, create a
+ // subclass of RandomAccessSource that wraps the shared resource
+ static std::unique_ptr<ParquetFileReader> Open(
+ std::unique_ptr<RandomAccessSource> source,
+ const ReaderProperties& props = default_reader_properties());
+
+ // Create a file reader instance from an Arrow file object. Thread-safety is
+ // the responsibility of the file implementation
+ static std::unique_ptr<ParquetFileReader> Open(
+ const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
+ const ReaderProperties& props = default_reader_properties());
+
// API Convenience to open a serialized Parquet file on disk, using built-in IO
// interface implementations that were created for testing, and may not be robust for
// all use cases.
static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
- bool memory_map = true, ReaderProperties props = default_reader_properties());
-
- static std::unique_ptr<ParquetFileReader> Open(
- std::unique_ptr<RandomAccessSource> source,
- ReaderProperties props = default_reader_properties());
+ bool memory_map = true,
+ const ReaderProperties& props = default_reader_properties());
void Open(std::unique_ptr<Contents> contents);
void Close();
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index c4681bd..48884ad 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -20,7 +20,7 @@
#include "parquet/column/writer.h"
#include "parquet/schema/converter.h"
#include "parquet/thrift/util.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
using parquet::schema::GroupNode;
using parquet::schema::SchemaFlattener;
@@ -37,6 +37,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type
ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator)
: sink_(sink),
metadata_(metadata),
+ allocator_(allocator),
num_values_(0),
dictionary_page_offset_(0),
data_page_offset_(0),
@@ -71,10 +72,13 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
// Compress the data
int64_t max_compressed_size =
compressor_->MaxCompressedLen(buffer->size(), buffer->data());
- auto compression_buffer = std::make_shared<OwnedMutableBuffer>(max_compressed_size);
+
+ std::shared_ptr<PoolBuffer> compression_buffer =
+ AllocateBuffer(allocator_, max_compressed_size);
+
int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
max_compressed_size, compression_buffer->mutable_data());
- compression_buffer->Resize(compressed_size);
+ PARQUET_THROW_NOT_OK(compression_buffer->Resize(compressed_size));
return compression_buffer;
}
@@ -182,7 +186,7 @@ void RowGroupSerializer::Close() {
// FileSerializer
std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
- std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties) {
std::unique_ptr<ParquetFileWriter::Contents> result(
new FileSerializer(sink, schema, properties));
@@ -248,7 +252,7 @@ void FileSerializer::WriteMetaData() {
sink_->Write(PARQUET_MAGIC, 4);
}
-FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
+FileSerializer::FileSerializer(const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties)
: sink_(sink),
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index f1f76ab..81a0837 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -26,6 +26,7 @@
#include "parquet/file/metadata.h"
#include "parquet/file/writer.h"
#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/memory.h"
namespace parquet {
@@ -54,6 +55,7 @@ class SerializedPageWriter : public PageWriter {
private:
OutputStream* sink_;
ColumnChunkMetaDataBuilder* metadata_;
+ MemoryAllocator* allocator_;
int64_t num_values_;
int64_t dictionary_page_offset_;
int64_t data_page_offset_;
@@ -102,7 +104,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
class FileSerializer : public ParquetFileWriter::Contents {
public:
static std::unique_ptr<ParquetFileWriter::Contents> Open(
- std::shared_ptr<OutputStream> sink,
+ const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<schema::GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
@@ -119,7 +121,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
virtual ~FileSerializer();
private:
- explicit FileSerializer(std::shared_ptr<OutputStream> sink,
+ explicit FileSerializer(const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<schema::GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index 8c9f52f..a381c22 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -18,7 +18,7 @@
#include "parquet/file/writer.h"
#include "parquet/file/writer-internal.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
using parquet::schema::GroupNode;
@@ -51,13 +51,19 @@ ParquetFileWriter::~ParquetFileWriter() {
}
std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
- std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema,
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties) {
- auto contents = FileSerializer::Open(sink, schema, properties);
+ return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties);
+}
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<schema::GroupNode>& schema,
+ const std::shared_ptr<WriterProperties>& properties) {
+ auto contents = FileSerializer::Open(sink, schema, properties);
std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
result->Open(std::move(contents));
-
return result;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index e82f016..6d7161b 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -24,7 +24,7 @@
#include "parquet/column/properties.h"
#include "parquet/schema/descriptor.h"
#include "parquet/schema/types.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
namespace parquet {
@@ -97,7 +97,13 @@ class PARQUET_EXPORT ParquetFileWriter {
ParquetFileWriter();
~ParquetFileWriter();
- static std::unique_ptr<ParquetFileWriter> Open(std::shared_ptr<OutputStream> sink,
+ static std::unique_ptr<ParquetFileWriter> Open(
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<schema::GroupNode>& schema,
+ const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+
+ static std::unique_ptr<ParquetFileWriter> Open(
+ const std::shared_ptr<OutputStream>& sink,
const std::shared_ptr<schema::GroupNode>& schema,
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index d21a809..e3be9b0 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -23,17 +23,20 @@
#include <memory>
#include <string>
+#include "arrow/io/file.h"
+
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
#include "parquet/file/reader-internal.h"
#include "parquet/file/reader.h"
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
using std::string;
namespace parquet {
+using ReadableFile = ::arrow::io::ReadableFile;
+
const char* data_dir = std::getenv("PARQUET_TEST_DATA");
std::string alltypes_plain() {
@@ -159,7 +162,7 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
ASSERT_THROW(reader_->DebugPrint(ss, columns), ParquetException);
}
-class TestLocalFileSource : public ::testing::Test {
+class TestLocalFile : public ::testing::Test {
public:
void SetUp() {
std::string dir_string(data_dir);
@@ -168,24 +171,25 @@ class TestLocalFileSource : public ::testing::Test {
ss << dir_string << "/"
<< "alltypes_plain.parquet";
- file.reset(new LocalFileSource());
- file->Open(ss.str());
+ PARQUET_THROW_NOT_OK(ReadableFile::Open(ss.str(), &handle));
+ fileno = handle->file_descriptor();
}
void TearDown() {}
protected:
- std::unique_ptr<LocalFileSource> file;
+ int fileno;
+ std::shared_ptr<::arrow::io::ReadableFile> handle;
};
-TEST_F(TestLocalFileSource, FileClosedOnDestruction) {
- int file_desc = file->file_descriptor();
+TEST_F(TestLocalFile, FileClosedOnDestruction) {
{
- auto contents = SerializedFile::Open(std::move(file));
+ auto contents = SerializedFile::Open(
+ std::unique_ptr<RandomAccessSource>(new ArrowInputFile(handle)));
std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
result->Open(std::move(contents));
}
- ASSERT_EQ(-1, fcntl(file_desc, F_GETFD));
+ ASSERT_EQ(-1, fcntl(fileno, F_GETFD));
ASSERT_EQ(EBADF, errno);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 1800435..9d2b66f 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -37,7 +37,7 @@
#include "parquet/exception.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/util/logging.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
namespace parquet {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 3a4b1c9..7a9ccba 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -20,17 +20,13 @@ install(FILES
bit-stream-utils.h
bit-stream-utils.inline.h
bit-util.h
- buffer.h
buffer-builder.h
compiler-util.h
cpu-info.h
hash-util.h
- input.h
logging.h
macros.h
- mem-allocator.h
- mem-pool.h
- output.h
+ memory.h
rle-encoding.h
stopwatch.h
sse-util.h
@@ -70,9 +66,6 @@ if (PARQUET_BUILD_BENCHMARKS)
endif()
ADD_PARQUET_TEST(bit-util-test)
-ADD_PARQUET_TEST(buffer-test)
ADD_PARQUET_TEST(comparison-test)
-ADD_PARQUET_TEST(input-output-test)
-ADD_PARQUET_TEST(mem-allocator-test)
-ADD_PARQUET_TEST(mem-pool-test)
+ADD_PARQUET_TEST(memory-test)
ADD_PARQUET_TEST(rle-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-test.cc b/src/parquet/util/buffer-test.cc
deleted file mode 100644
index ee5b000..0000000
--- a/src/parquet/util/buffer-test.cc
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdlib>
-#include <exception>
-#include <gtest/gtest.h>
-#include <limits>
-#include <memory>
-#include <string>
-
-#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
-
-using std::string;
-
-namespace parquet {
-
-class TestBuffer : public ::testing::Test {};
-
-TEST_F(TestBuffer, Resize) {
- OwnedMutableBuffer buf;
-
- ASSERT_EQ(0, buf.size());
- ASSERT_NO_THROW(buf.Resize(100));
- ASSERT_EQ(100, buf.size());
- ASSERT_NO_THROW(buf.Resize(200));
- ASSERT_EQ(200, buf.size());
-
- // Make it smaller, too
- ASSERT_NO_THROW(buf.Resize(50));
- ASSERT_EQ(50, buf.size());
-}
-
-TEST_F(TestBuffer, ResizeOOM) {
-// Tests that deliberately throw Exceptions foul up valgrind and report
-// red herring memory leaks
-#ifndef PARQUET_VALGRIND
- OwnedMutableBuffer buf;
- ASSERT_NO_THROW(buf.Resize(100));
- int64_t to_alloc = std::numeric_limits<int64_t>::max();
- try {
- buf.Resize(to_alloc);
- FAIL() << "Exception not thrown";
- } catch (const ParquetException& e) {
- // pass
- } catch (const std::exception& e) { FAIL() << "Different exception thrown"; }
-#endif
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.cc b/src/parquet/util/buffer.cc
deleted file mode 100644
index 0b7100c..0000000
--- a/src/parquet/util/buffer.cc
+++ /dev/null
@@ -1,123 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/buffer.h"
-
-#include <algorithm>
-#include <cstdint>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
-
-namespace parquet {
-
-Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
- data_ = parent->data() + offset;
- size_ = size;
- parent_ = parent;
-}
-
-std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() {
- return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size());
-}
-
-OwnedMutableBuffer::OwnedMutableBuffer(int64_t size, MemoryAllocator* allocator)
- : ResizableBuffer(nullptr, 0), allocator_(allocator) {
- Resize(size);
-}
-
-OwnedMutableBuffer::~OwnedMutableBuffer() {
- if (mutable_data_) { allocator_->Free(mutable_data_, capacity_); }
-}
-
-void OwnedMutableBuffer::Reserve(int64_t new_capacity) {
- if (!mutable_data_ || new_capacity > capacity_) {
- if (mutable_data_) {
- uint8_t* new_data = allocator_->Malloc(new_capacity);
- memcpy(new_data, mutable_data_, size_);
- allocator_->Free(mutable_data_, capacity_);
- mutable_data_ = new_data;
- } else {
- mutable_data_ = allocator_->Malloc(new_capacity);
- }
- data_ = mutable_data_;
- capacity_ = new_capacity;
- }
-}
-
-void OwnedMutableBuffer::Resize(int64_t new_size) {
- Reserve(new_size);
- size_ = new_size;
-}
-
-uint8_t& OwnedMutableBuffer::operator[](int64_t i) {
- return mutable_data_[i];
-}
-
-template <class T>
-Vector<T>::Vector(int64_t size, MemoryAllocator* allocator)
- : buffer_(new OwnedMutableBuffer(size * sizeof(T), allocator)),
- size_(size),
- capacity_(size) {
- if (size > 0) {
- data_ = reinterpret_cast<T*>(buffer_->mutable_data());
- } else {
- data_ = nullptr;
- }
-}
-
-template <class T>
-void Vector<T>::Reserve(int64_t new_capacity) {
- if (new_capacity > capacity_) {
- buffer_->Resize(new_capacity * sizeof(T));
- data_ = reinterpret_cast<T*>(buffer_->mutable_data());
- capacity_ = new_capacity;
- }
-}
-
-template <class T>
-void Vector<T>::Resize(int64_t new_size) {
- Reserve(new_size);
- size_ = new_size;
-}
-
-template <class T>
-void Vector<T>::Assign(int64_t size, const T val) {
- Resize(size);
- for (int64_t i = 0; i < size_; i++) {
- data_[i] = val;
- }
-}
-
-template <class T>
-void Vector<T>::Swap(Vector<T>& v) {
- buffer_.swap(v.buffer_);
- std::swap(size_, v.size_);
- std::swap(capacity_, v.capacity_);
- std::swap(data_, v.data_);
-}
-
-template class Vector<int32_t>;
-template class Vector<int64_t>;
-template class Vector<bool>;
-template class Vector<float>;
-template class Vector<double>;
-template class Vector<Int96>;
-template class Vector<ByteArray>;
-template class Vector<FixedLenByteArray>;
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.h b/src/parquet/util/buffer.h
deleted file mode 100644
index 58a5f5e..0000000
--- a/src/parquet/util/buffer.h
+++ /dev/null
@@ -1,149 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_UTIL_BUFFER_H
-#define PARQUET_UTIL_BUFFER_H
-
-#include <cstdint>
-#include <cstdlib>
-#include <cstring>
-#include <memory>
-#include <vector>
-
-#include "parquet/util/macros.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// Buffer classes
-
-// Immutable API for a chunk of bytes which may or may not be owned by the
-// class instance
-class PARQUET_EXPORT Buffer : public std::enable_shared_from_this<Buffer> {
- public:
- Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
-
- // An offset into data that is owned by another buffer, but we want to be
- // able to retain a valid pointer to it even after other shared_ptr's to the
- // parent buffer have been destroyed
- Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);
-
- std::shared_ptr<Buffer> get_shared_ptr() { return shared_from_this(); }
-
- // Return true if both buffers are the same size and contain the same bytes
- // up to the number of compared bytes
- bool Equals(const Buffer& other, int64_t nbytes) const {
- return this == &other || (size_ >= nbytes && other.size_ >= nbytes &&
- !memcmp(data_, other.data_, nbytes));
- }
-
- bool Equals(const Buffer& other) const {
- return this == &other || (size_ == other.size_ && !memcmp(data_, other.data_, size_));
- }
-
- const uint8_t* data() const { return data_; }
-
- int64_t size() const { return size_; }
-
- // Returns true if this Buffer is referencing memory (possibly) owned by some
- // other buffer
- bool is_shared() const { return static_cast<bool>(parent_); }
-
- const std::shared_ptr<Buffer> parent() const { return parent_; }
-
- protected:
- const uint8_t* data_;
- int64_t size_;
-
- // nullptr by default, but may be set
- std::shared_ptr<Buffer> parent_;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(Buffer);
-};
-
-// A Buffer whose contents can be mutated. May or may not own its data.
-class PARQUET_EXPORT MutableBuffer : public Buffer {
- public:
- MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) {
- mutable_data_ = data;
- }
-
- uint8_t* mutable_data() { return mutable_data_; }
-
- // Get a read-only view of this buffer
- std::shared_ptr<Buffer> GetImmutableView();
-
- protected:
- MutableBuffer() : Buffer(nullptr, 0), mutable_data_(nullptr) {}
-
- uint8_t* mutable_data_;
-};
-
-class PARQUET_EXPORT ResizableBuffer : public MutableBuffer {
- public:
- virtual void Resize(int64_t new_size) = 0;
-
- protected:
- ResizableBuffer(uint8_t* data, int64_t size)
- : MutableBuffer(data, size), capacity_(size) {}
- int64_t capacity_;
-};
-
-// A ResizableBuffer whose memory is owned by the class instance. For example,
-// for reading data out of files that you want to deallocate when this class is
-// garbage-collected
-class PARQUET_EXPORT OwnedMutableBuffer : public ResizableBuffer {
- public:
- explicit OwnedMutableBuffer(
- int64_t size = 0, MemoryAllocator* allocator = default_allocator());
- virtual ~OwnedMutableBuffer();
- void Resize(int64_t new_size) override;
- void Reserve(int64_t new_capacity);
- uint8_t& operator[](int64_t i);
-
- private:
- // TODO: aligned allocations
- MemoryAllocator* allocator_;
-
- DISALLOW_COPY_AND_ASSIGN(OwnedMutableBuffer);
-};
-
-template <class T>
-class Vector {
- public:
- explicit Vector(int64_t size, MemoryAllocator* allocator);
- void Resize(int64_t new_size);
- void Reserve(int64_t new_capacity);
- void Assign(int64_t size, const T val);
- void Swap(Vector<T>& v);
- inline T& operator[](int64_t i) const { return data_[i]; }
-
- private:
- std::unique_ptr<OwnedMutableBuffer> buffer_;
- int64_t size_;
- int64_t capacity_;
- T* data_;
-
- DISALLOW_COPY_AND_ASSIGN(Vector);
-};
-
-} // namespace parquet
-
-#endif // PARQUET_UTIL_BUFFER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
deleted file mode 100644
index 72aad9c..0000000
--- a/src/parquet/util/input-output-test.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <cstdint>
-#include <cstdio>
-#include <fstream>
-#include <iostream>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/output.h"
-#include "parquet/util/test-common.h"
-
-namespace parquet {
-
-TEST(TestBufferedInputStream, Basics) {
- int64_t source_size = 256;
- int64_t stream_offset = 10;
- int64_t stream_size = source_size - stream_offset;
- int64_t chunk_size = 50;
- auto buf = std::make_shared<OwnedMutableBuffer>(source_size);
- ASSERT_EQ(source_size, buf->size());
- for (int i = 0; i < source_size; i++) {
- buf->mutable_data()[i] = i;
- }
-
- std::unique_ptr<BufferReader> source(new BufferReader(buf));
- std::unique_ptr<MemoryAllocator> allocator(new TrackingAllocator());
- std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
- allocator.get(), chunk_size, source.get(), stream_offset, stream_size));
-
- const uint8_t* output;
- int64_t bytes_read;
-
- // source is at offset 10
- output = stream->Peek(10, &bytes_read);
- ASSERT_EQ(10, bytes_read);
- for (int i = 0; i < 10; i++) {
- ASSERT_EQ(10 + i, output[i]) << i;
- }
- output = stream->Read(10, &bytes_read);
- ASSERT_EQ(10, bytes_read);
- for (int i = 0; i < 10; i++) {
- ASSERT_EQ(10 + i, output[i]) << i;
- }
- output = stream->Read(10, &bytes_read);
- ASSERT_EQ(10, bytes_read);
- for (int i = 0; i < 10; i++) {
- ASSERT_EQ(20 + i, output[i]) << i;
- }
- stream->Advance(5);
- stream->Advance(5);
- // source is at offset 40
- // read across buffer boundary. buffer size is 50
- output = stream->Read(20, &bytes_read);
- ASSERT_EQ(20, bytes_read);
- for (int i = 0; i < 20; i++) {
- ASSERT_EQ(40 + i, output[i]) << i;
- }
- // read more than original chunk_size
- output = stream->Read(60, &bytes_read);
- ASSERT_EQ(60, bytes_read);
- for (int i = 0; i < 60; i++) {
- ASSERT_EQ(60 + i, output[i]) << i;
- }
-
- stream->Advance(120);
- // source is at offset 240
- // read outside of source boundary. source size is 256
- output = stream->Read(30, &bytes_read);
- ASSERT_EQ(16, bytes_read);
- for (int i = 0; i < 16; i++) {
- ASSERT_EQ(240 + i, output[i]) << i;
- }
-}
-
-TEST(TestInMemoryOutputStream, Basics) {
- std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
-
- std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
-
- stream->Write(&data[0], 4);
- ASSERT_EQ(4, stream->Tell());
- stream->Write(&data[4], data.size() - 4);
-
- std::shared_ptr<Buffer> buffer = stream->GetBuffer();
-
- Buffer data_buf(data.data(), data.size());
-
- ASSERT_TRUE(data_buf.Equals(*buffer));
-}
-
-TEST(TestBufferedReader, Basics) {
- std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
- auto buffer = std::make_shared<Buffer>(data.data(), data.size());
- BufferReader reader(buffer);
-
- uint8_t out[4];
- ASSERT_EQ(4, reader.Read(4, out));
- ASSERT_EQ(4, reader.Tell());
- ASSERT_EQ(0, out[0]);
- ASSERT_EQ(1, out[1]);
- ASSERT_EQ(2, out[2]);
- ASSERT_EQ(3, out[3]);
-
- reader.Seek(8);
- ASSERT_EQ(8, reader.Tell());
-
- auto out_buffer = reader.Read(5);
- ASSERT_EQ(8, out_buffer->data()[0]);
- ASSERT_EQ(9, out_buffer->data()[1]);
- ASSERT_EQ(10, out_buffer->data()[2]);
- ASSERT_EQ(11, out_buffer->data()[3]);
- ASSERT_EQ(12, out_buffer->data()[4]);
-
- // Read past the end of the buffer
- ASSERT_EQ(13, reader.Tell());
- ASSERT_EQ(0, reader.Read(4, out));
- ASSERT_EQ(0, reader.Read(4)->size());
-
- reader.Close();
-}
-
-static bool file_exists(const std::string& path) {
- return std::ifstream(path.c_str()).good();
-}
-
-template <typename ReaderType>
-class TestFileReaders : public ::testing::Test {
- public:
- void SetUp() {
- test_path_ = "parquet-input-output-test.txt";
- if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
- test_data_ = "testingdata";
-
- std::ofstream stream;
- stream.open(test_path_.c_str());
- stream << test_data_;
- filesize_ = test_data_.size();
- }
-
- void TearDown() { DeleteTestFile(); }
-
- void DeleteTestFile() {
- if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
- }
-
- protected:
- ReaderType source;
- std::string test_path_;
- std::string test_data_;
- int filesize_;
-};
-
-typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes;
-
-TYPED_TEST_CASE(TestFileReaders, ReaderTypes);
-
-TYPED_TEST(TestFileReaders, NonExistentFile) {
- ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException);
-}
-
-TYPED_TEST(TestFileReaders, Read) {
- this->source.Open(this->test_path_);
-
- ASSERT_EQ(this->filesize_, this->source.Size());
-
- std::shared_ptr<Buffer> buffer = this->source.Read(4);
- ASSERT_EQ(4, buffer->size());
- ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4));
-
- // Read past EOF
- buffer = this->source.Read(10);
- ASSERT_EQ(7, buffer->size());
- ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7));
-}
-
-TYPED_TEST(TestFileReaders, FileDisappeared) {
- this->source.Open(this->test_path_);
- this->source.Seek(4);
- this->DeleteTestFile();
- this->source.Close();
-}
-
-TYPED_TEST(TestFileReaders, BadSeek) {
- this->source.Open(this->test_path_);
- ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
-}
-
-class TestFileWriter : public ::testing::Test {
- public:
- void SetUp() {
- test_path_ = "parquet-input-output-test.txt";
- if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
- }
-
- void TearDown() { DeleteTestFile(); }
-
- void DeleteTestFile() {
- if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
- }
-
- protected:
- std::string test_path_;
- uint8_t test_data_[4] = {1, 2, 3, 4};
-};
-
-TEST_F(TestFileWriter, Write) {
- LocalFileOutputStream sink(test_path_);
- ASSERT_EQ(0, sink.Tell());
- sink.Write(test_data_, 4);
- ASSERT_EQ(4, sink.Tell());
- sink.Close();
-
- // Check that the correct content was written
- LocalFileSource source;
- source.Open(test_path_);
- std::shared_ptr<Buffer> buffer = source.Read(4);
- ASSERT_EQ(4, buffer->size());
- ASSERT_EQ(0, memcmp(test_data_, buffer->data(), 4));
-}
-
-} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
deleted file mode 100644
index 127b90c..0000000
--- a/src/parquet/util/input.cc
+++ /dev/null
@@ -1,285 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/input.h"
-
-#include <algorithm>
-#include <sstream>
-#include <string>
-#include <sys/mman.h>
-
-#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/logging.h"
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RandomAccessSource
-
-std::shared_ptr<Buffer> RandomAccessSource::ReadAt(int64_t pos, int64_t nbytes) {
- Seek(pos);
- return Read(nbytes);
-}
-
-int64_t RandomAccessSource::Size() const {
- return size_;
-}
-
-// ----------------------------------------------------------------------
-// LocalFileSource
-
-LocalFileSource::~LocalFileSource() {
- CloseFile();
-}
-
-void LocalFileSource::Open(const std::string& path) {
- path_ = path;
- file_ = fopen(path_.c_str(), "rb");
- if (file_ == nullptr || ferror(file_)) {
- std::stringstream ss;
- ss << "Unable to open file: " << path;
- throw ParquetException(ss.str());
- }
- is_open_ = true;
- SeekFile(0, SEEK_END);
- size_ = LocalFileSource::Tell();
- Seek(0);
-}
-
-void LocalFileSource::SeekFile(int64_t pos, int origin) {
- if (origin == SEEK_SET && (pos < 0 || pos >= size_)) {
- std::stringstream ss;
- ss << "Position " << pos << " is not in range.";
- throw ParquetException(ss.str());
- }
-
- if (0 != fseek(file_, pos, origin)) {
- std::stringstream ss;
- ss << "File seek to position " << pos << " failed.";
- throw ParquetException(ss.str());
- }
-}
-
-void LocalFileSource::Close() {
- // Pure virtual
- CloseFile();
-}
-
-void LocalFileSource::CloseFile() {
- if (is_open_) {
- fclose(file_);
- is_open_ = false;
- }
-}
-
-void LocalFileSource::Seek(int64_t pos) {
- SeekFile(pos);
-}
-
-int64_t LocalFileSource::Tell() const {
- int64_t position = ftell(file_);
- if (position < 0) { throw ParquetException("ftell failed, did the file disappear?"); }
- return position;
-}
-
-int LocalFileSource::file_descriptor() const {
- return fileno(file_);
-}
-
-int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
- return fread(buffer, 1, nbytes, file_);
-}
-
-std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
- auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
- result->Resize(nbytes);
-
- int64_t bytes_read = Read(nbytes, result->mutable_data());
- if (bytes_read < nbytes) { result->Resize(bytes_read); }
- return result;
-}
-// ----------------------------------------------------------------------
-// MemoryMapSource methods
-
-MemoryMapSource::~MemoryMapSource() {
- CloseFile();
-}
-
-void MemoryMapSource::Open(const std::string& path) {
- LocalFileSource::Open(path);
- data_ = reinterpret_cast<uint8_t*>(
- mmap(nullptr, size_, PROT_READ, MAP_SHARED, fileno(file_), 0));
- if (data_ == nullptr) { throw ParquetException("Memory mapping file failed"); }
- pos_ = 0;
-}
-
-void MemoryMapSource::Close() {
- // Pure virtual
- CloseFile();
-}
-
-void MemoryMapSource::CloseFile() {
- if (data_ != nullptr) {
- munmap(data_, size_);
- data_ = nullptr;
- }
-
- LocalFileSource::CloseFile();
-}
-
-void MemoryMapSource::Seek(int64_t pos) {
- if (pos < 0 || pos >= size_) {
- std::stringstream ss;
- ss << "Position " << pos << " is not in range.";
- throw ParquetException(ss.str());
- }
-
- pos_ = pos;
-}
-
-int64_t MemoryMapSource::Tell() const {
- return pos_;
-}
-
-int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) {
- int64_t bytes_available = std::min(nbytes, size_ - pos_);
- memcpy(buffer, data_ + pos_, bytes_available);
- pos_ += bytes_available;
- return bytes_available;
-}
-
-std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) {
- int64_t bytes_available = std::min(nbytes, size_ - pos_);
- auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available);
- pos_ += bytes_available;
- return result;
-}
-
-// ----------------------------------------------------------------------
-// BufferReader
-
-BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
- : buffer_(buffer), data_(buffer->data()), pos_(0) {
- size_ = buffer->size();
-}
-
-int64_t BufferReader::Tell() const {
- return pos_;
-}
-
-void BufferReader::Seek(int64_t pos) {
- if (pos < 0 || pos >= size_) {
- std::stringstream ss;
- ss << "Cannot seek to " << pos << "File is length " << size_;
- throw ParquetException(ss.str());
- }
- pos_ = pos;
-}
-
-int64_t BufferReader::Read(int64_t nbytes, uint8_t* out) {
- int64_t bytes_available = std::min(nbytes, size_ - pos_);
- memcpy(out, Head(), bytes_available);
- pos_ += bytes_available;
- return bytes_available;
-}
-
-std::shared_ptr<Buffer> BufferReader::Read(int64_t nbytes) {
- int64_t bytes_available = std::min(nbytes, size_ - pos_);
- auto result = std::make_shared<Buffer>(Head(), bytes_available);
- pos_ += bytes_available;
- return result;
-}
-
-// ----------------------------------------------------------------------
-// InMemoryInputStream
-
-InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer)
- : buffer_(buffer), offset_(0) {
- len_ = buffer_->size();
-}
-
-InMemoryInputStream::InMemoryInputStream(
- RandomAccessSource* source, int64_t start, int64_t num_bytes)
- : offset_(0) {
- buffer_ = source->ReadAt(start, num_bytes);
- if (buffer_->size() < num_bytes) {
- throw ParquetException("Unable to read column chunk data");
- }
- len_ = buffer_->size();
-}
-
-const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
- *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
- return buffer_->data() + offset_;
-}
-
-const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
- const uint8_t* result = Peek(num_to_read, num_bytes);
- offset_ += *num_bytes;
- return result;
-}
-
-void InMemoryInputStream::Advance(int64_t num_bytes) {
- offset_ += num_bytes;
-}
-
-// ----------------------------------------------------------------------
-// BufferedInputStream
-BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
- RandomAccessSource* source, int64_t start, int64_t num_bytes)
- : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
- buffer_ = std::make_shared<OwnedMutableBuffer>(buffer_size, pool);
- buffer_size_ = buffer_->size();
- // Required to force a lazy read
- buffer_offset_ = buffer_size_;
-}
-
-const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
- *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_);
- // increase the buffer size if needed
- if (*num_bytes > buffer_size_) {
- buffer_->Resize(*num_bytes);
- buffer_size_ = buffer_->size();
- DCHECK(buffer_size_ >= *num_bytes);
- }
- // Read more data when buffer has insufficient left or when resized
- if (*num_bytes > (buffer_size_ - buffer_offset_)) {
- source_->Seek(stream_offset_);
- buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
- int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data());
- if (bytes_read < *num_bytes) {
- throw ParquetException("Failed reading column data from source");
- }
- buffer_offset_ = 0;
- }
- return buffer_->data() + buffer_offset_;
-}
-
-const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
- const uint8_t* result = Peek(num_to_read, num_bytes);
- stream_offset_ += *num_bytes;
- buffer_offset_ += *num_bytes;
- return result;
-}
-
-void BufferedInputStream::Advance(int64_t num_bytes) {
- stream_offset_ += num_bytes;
- buffer_offset_ += num_bytes;
-}
-
-} // namespace parquet