You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/11 07:31:26 UTC
parquet-cpp git commit: PARQUET-501: Add OutputStream abstract
interface, refactor encoding code paths
Repository: parquet-cpp
Updated Branches:
refs/heads/master 4d735876d -> c11e7d487
PARQUET-501: Add OutputStream abstract interface, refactor encoding code paths
I also did a bit of tidying / reorganization and giving interfaces more descriptive names.
Author: Wes McKinney <we...@cloudera.com>
Closes #46 from wesm/PARQUET-501 and squashes the following commits:
491aa89 [Wes McKinney] * Add a basic OutputStream abstract interface and an InMemoryOutputStream implementation for testing. * Refactor to use OutputStream on data encoding paths, reduce some code duplication in column-reader-test. * Collect all input/output classes into util/input.* and util/output.*. * Use int64_t in InputStream::Peek/Read.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c11e7d48
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c11e7d48
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c11e7d48
Branch: refs/heads/master
Commit: c11e7d487aba2ba91efb261fa20dda4dd6498ac7
Parents: 4d73587
Author: Wes McKinney <we...@cloudera.com>
Authored: Wed Feb 10 22:31:22 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Wed Feb 10 22:31:22 2016 -0800
----------------------------------------------------------------------
example/parquet-dump-schema.cc | 2 +-
example/parquet_reader.cc | 2 +-
src/parquet/column/column-reader-test.cc | 55 +++-------
src/parquet/column/serialized-page.cc | 3 +-
src/parquet/column/serialized-page.h | 2 +-
src/parquet/column/test-util.h | 100 +++++++++--------
src/parquet/encodings/encodings.h | 8 +-
src/parquet/encodings/plain-encoding-test.cc | 11 +-
src/parquet/encodings/plain-encoding.h | 39 ++++---
src/parquet/parquet.h | 4 +-
src/parquet/reader-test.cc | 3 +-
src/parquet/reader.cc | 47 +-------
src/parquet/reader.h | 47 +-------
src/parquet/util/CMakeLists.txt | 7 +-
src/parquet/util/input.cc | 110 +++++++++++++++++++
src/parquet/util/input.h | 128 ++++++++++++++++++++++
src/parquet/util/input_stream.cc | 63 -----------
src/parquet/util/input_stream.h | 80 --------------
src/parquet/util/output-test.cc | 44 ++++++++
src/parquet/util/output.cc | 73 ++++++++++++
src/parquet/util/output.h | 71 ++++++++++++
src/parquet/util/test-common.h | 23 ++++
22 files changed, 564 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/example/parquet-dump-schema.cc
----------------------------------------------------------------------
diff --git a/example/parquet-dump-schema.cc b/example/parquet-dump-schema.cc
index 9471225..09c715c 100644
--- a/example/parquet-dump-schema.cc
+++ b/example/parquet-dump-schema.cc
@@ -35,7 +35,7 @@ int main(int argc, char** argv) {
std::string filename = argv[1];
parquet_cpp::ParquetFileReader reader;
- parquet_cpp::LocalFile file;
+ parquet_cpp::LocalFileSource file;
file.Open(filename);
if (!file.is_open()) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 621f0ba..ca717df 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -40,7 +40,7 @@ int main(int argc, char** argv) {
}
parquet_cpp::ParquetFileReader reader;
- parquet_cpp::LocalFile file;
+ parquet_cpp::LocalFileSource file;
file.Open(filename);
if (!file.is_open()) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 0d4aea1..84a36db 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -29,6 +29,7 @@
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"
+#include "parquet/util/output.h"
#include "parquet/util/test-common.h"
using std::string;
@@ -60,31 +61,15 @@ class TestPrimitiveReader : public ::testing::Test {
vector<shared_ptr<Page> > pages_;
};
-template <typename T>
-static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
- if (end < start) {
- return vector<T>(0);
- }
-
- vector<T> out(end - start);
- for (size_t i = start; i < end; ++i) {
- out[i - start] = values[i];
- }
- return out;
-}
-
TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
- size_t num_values = values.size();
- parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
- vector<uint8_t> page1;
- test::DataPageBuilder<Type::INT32> page_builder(&page1);
- page_builder.AppendValues(values, parquet::Encoding::PLAIN);
- pages_.push_back(page_builder.Finish());
+ std::vector<uint8_t> buffer;
+ std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
+ {}, 0, &buffer);
+ pages_.push_back(page);
- // TODO: simplify this
NodePtr type = schema::Int32("a", Repetition::REQUIRED);
ColumnDescriptor descr(type, 0, 0);
InitReader(&descr);
@@ -102,21 +87,16 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
ASSERT_TRUE(vector_equal(result, values));
}
+
TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
vector<int32_t> values = {1, 2, 3, 4, 5};
vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
- size_t num_values = values.size();
- parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
-
- vector<uint8_t> page1;
- test::DataPageBuilder<Type::INT32> page_builder(&page1);
-
- // Definition levels precede the values
- page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE);
- page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+ std::vector<uint8_t> buffer;
+ std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
+ {}, 0, &buffer);
- pages_.push_back(page_builder.Finish());
+ pages_.push_back(page);
NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
ColumnDescriptor descr(type, 1, 0);
@@ -159,18 +139,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};
- size_t num_values = values.size();
- parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
-
- vector<uint8_t> page1;
- test::DataPageBuilder<Type::INT32> page_builder(&page1);
-
- // Definition levels precede the values
- page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE);
- page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE);
- page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+ std::vector<uint8_t> buffer;
+ std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
+ def_levels, 2, rep_levels, 1, &buffer);
- pages_.push_back(page_builder.Finish());
+ pages_.push_back(page);
NodePtr type = schema::Int32("a", Repetition::REPEATED);
ColumnDescriptor descr(type, 2, 1);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/serialized-page.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc
index 1cbaf4d..b9d470c 100644
--- a/src/parquet/column/serialized-page.cc
+++ b/src/parquet/column/serialized-page.cc
@@ -21,7 +21,6 @@
#include "parquet/exception.h"
#include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
using parquet::PageType;
@@ -52,7 +51,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
// Loop here because there may be unhandled page types that we skip until
// finding a page that we do know what to do with
while (true) {
- int bytes_read = 0;
+ int64_t bytes_read = 0;
const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
if (bytes_read == 0) {
return std::shared_ptr<Page>(nullptr);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/serialized-page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h
index 2735c3c..c02152f 100644
--- a/src/parquet/column/serialized-page.h
+++ b/src/parquet/column/serialized-page.h
@@ -27,7 +27,7 @@
#include "parquet/column/page.h"
#include "parquet/compression/codec.h"
-#include "parquet/util/input_stream.h"
+#include "parquet/util/input.h"
#include "parquet/thrift/parquet_types.h"
namespace parquet_cpp {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 8861134..1cbcf8c 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -52,26 +52,22 @@ class MockPageReader : public PageReader {
size_t page_index_;
};
-// TODO(wesm): this is only used for testing for now
-
-static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
-static constexpr int INIT_BUFFER_SIZE = 1024;
+// TODO(wesm): this is only used for testing for now. Refactor to form part of
+// primary file write path
template <int TYPE>
class DataPageBuilder {
public:
typedef typename type_traits<TYPE>::value_type T;
- // The passed vector is the owner of the page's data
- explicit DataPageBuilder(std::vector<uint8_t>* out) :
- out_(out),
- buffer_size_(0),
+ // This class writes data and metadata to the passed inputs
+ explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) :
+ sink_(sink),
+ header_(header),
num_values_(0),
have_def_levels_(false),
have_rep_levels_(false),
have_values_(false) {
- out_->resize(INIT_BUFFER_SIZE);
- buffer_capacity_ = INIT_BUFFER_SIZE;
}
void AppendDefLevels(const std::vector<int16_t>& levels,
@@ -79,7 +75,7 @@ class DataPageBuilder {
AppendLevels(levels, max_level, encoding);
num_values_ = std::max(levels.size(), num_values_);
- header_.__set_definition_level_encoding(encoding);
+ header_->__set_definition_level_encoding(encoding);
have_def_levels_ = true;
}
@@ -88,7 +84,7 @@ class DataPageBuilder {
AppendLevels(levels, max_level, encoding);
num_values_ = std::max(levels.size(), num_values_);
- header_.__set_repetition_level_encoding(encoding);
+ header_->__set_repetition_level_encoding(encoding);
have_rep_levels_ = true;
}
@@ -98,53 +94,31 @@ class DataPageBuilder {
ParquetException::NYI("only plain encoding currently implemented");
}
size_t bytes_to_encode = values.size() * sizeof(T);
- Reserve(bytes_to_encode);
PlainEncoder<TYPE> encoder(nullptr);
- size_t nbytes = encoder.Encode(&values[0], values.size(), Head());
- // In case for some reason it's fewer than bytes_to_encode
- buffer_size_ += nbytes;
+ encoder.Encode(&values[0], values.size(), sink_);
num_values_ = std::max(values.size(), num_values_);
- header_.__set_encoding(encoding);
+ header_->__set_encoding(encoding);
have_values_ = true;
}
- std::shared_ptr<Page> Finish() {
+ void Finish() {
if (!have_values_) {
throw ParquetException("A data page must at least contain values");
}
- header_.__set_num_values(num_values_);
- return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_);
+ header_->__set_num_values(num_values_);
}
private:
- std::vector<uint8_t>* out_;
-
- size_t buffer_size_;
- size_t buffer_capacity_;
-
- parquet::DataPageHeader header_;
+ InMemoryOutputStream* sink_;
+ parquet::DataPageHeader* header_;
size_t num_values_;
-
bool have_def_levels_;
bool have_rep_levels_;
bool have_values_;
- void Reserve(size_t nbytes) {
- while ((nbytes + buffer_size_) > buffer_capacity_) {
- // TODO(wesm): limit to one reserve when this loop runs more than once
- size_t new_capacity = 2 * buffer_capacity_;
- out_->resize(new_capacity);
- buffer_capacity_ = new_capacity;
- }
- }
-
- uint8_t* Head() {
- return &(*out_)[buffer_size_];
- }
-
// Used internally for both repetition and definition levels
void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
parquet::Encoding::type encoding) {
@@ -153,9 +127,11 @@ class DataPageBuilder {
}
// TODO: compute a more precise maximum size for the encoded levels
- std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);
-
+ std::vector<uint8_t> encode_buffer(levels.size() * 4);
+ // We encode into separate memory from the output stream because the
+ // RLE-encoded bytes have to be preceded in the stream by their absolute
+ // size.
LevelEncoder encoder;
encoder.Init(encoding, max_level, levels.size(),
encode_buffer.data(), encode_buffer.size());
@@ -163,15 +139,43 @@ class DataPageBuilder {
encoder.Encode(levels.size(), levels.data());
uint32_t rle_bytes = encoder.len();
- size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
- Reserve(levels_footprint);
-
- *reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
- memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes);
- buffer_size_ += levels_footprint;
+ sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(uint32_t));
+ sink_->Write(encode_buffer.data(), rle_bytes);
}
};
+template <int TYPE, typename T>
+static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
+ const std::vector<int16_t>& def_levels, int16_t max_def_level,
+ const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
+ std::vector<uint8_t>* out_buffer) {
+ size_t num_values = values.size();
+
+ InMemoryOutputStream page_stream;
+ parquet::DataPageHeader page_header;
+
+ test::DataPageBuilder<TYPE> page_builder(&page_stream, &page_header);
+
+ if (!rep_levels.empty()) {
+ page_builder.AppendRepLevels(rep_levels, max_rep_level,
+ parquet::Encoding::RLE);
+ }
+
+ if (!def_levels.empty()) {
+ page_builder.AppendDefLevels(def_levels, max_def_level,
+ parquet::Encoding::RLE);
+ }
+
+ page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+ page_builder.Finish();
+
+ // Hand off the data stream to the passed std::vector
+ page_stream.Transfer(out_buffer);
+
+ return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header);
+}
+
+
} // namespace test
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
index 21754d1..46c61b6 100644
--- a/src/parquet/encodings/encodings.h
+++ b/src/parquet/encodings/encodings.h
@@ -23,6 +23,7 @@
#include "parquet/exception.h"
#include "parquet/types.h"
+#include "parquet/util/output.h"
#include "parquet/util/rle-encoding.h"
#include "parquet/util/bit-stream-utils.inline.h"
@@ -82,14 +83,9 @@ class Encoder {
virtual ~Encoder() {}
- // TODO(wesm): use an output stream
-
// Subclasses should override the ones they support
- //
- // @returns: the number of bytes written to dst
- virtual size_t Encode(const T* src, int num_values, uint8_t* dst) {
+ virtual void Encode(const T* src, int num_values, OutputStream* dst) {
throw ParquetException("Encoder does not implement this type.");
- return 0;
}
const parquet::Encoding::type encoding() const { return encoding_; }
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index ca425dd..16862b8 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -43,15 +43,18 @@ TEST(BooleanTest, TestEncodeDecode) {
PlainEncoder<Type::BOOLEAN> encoder(nullptr);
PlainDecoder<Type::BOOLEAN> decoder(nullptr);
- std::vector<uint8_t> encode_buffer(nbytes);
+ InMemoryOutputStream dst;
+ encoder.Encode(draws, nvalues, &dst);
- size_t encoded_bytes = encoder.Encode(draws, nvalues, &encode_buffer[0]);
- ASSERT_EQ(nbytes, encoded_bytes);
+ std::vector<uint8_t> encode_buffer;
+ dst.Transfer(&encode_buffer);
+
+ ASSERT_EQ(nbytes, encode_buffer.size());
std::vector<uint8_t> decode_buffer(nbytes);
const uint8_t* decode_data = &decode_buffer[0];
- decoder.SetData(nvalues, &encode_buffer[0], encoded_bytes);
+ decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
ASSERT_EQ(nvalues, values_decoded);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 03f5940..a450eb4 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -147,7 +147,7 @@ class PlainEncoder : public Encoder<TYPE> {
explicit PlainEncoder(const ColumnDescriptor* descr) :
Encoder<TYPE>(descr, parquet::Encoding::PLAIN) {}
- virtual size_t Encode(const T* src, int num_values, uint8_t* dst);
+ virtual void Encode(const T* src, int num_values, OutputStream* dst);
};
template <>
@@ -156,43 +156,46 @@ class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
explicit PlainEncoder(const ColumnDescriptor* descr) :
Encoder<Type::BOOLEAN>(descr, parquet::Encoding::PLAIN) {}
- virtual size_t Encode(const bool* src, int num_values, uint8_t* dst) {
+ virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
throw ParquetException("this API for encoding bools not implemented");
- return 0;
}
- size_t Encode(const std::vector<bool>& src, int num_values,
- uint8_t* dst) {
+ void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst) {
size_t bytes_required = BitUtil::RoundUp(num_values, 8) / 8;
- BitWriter bit_writer(dst, bytes_required);
+
+ // TODO(wesm)
+ // Use a temporary buffer for now and copy, because the BitWriter is not
+ // aware of OutputStream. Later we can add some kind of Request/Flush API
+ // to OutputStream
+ std::vector<uint8_t> tmp_buffer(bytes_required);
+
+ BitWriter bit_writer(&tmp_buffer[0], bytes_required);
for (size_t i = 0; i < num_values; ++i) {
bit_writer.PutValue(src[i], 1);
}
bit_writer.Flush();
- return bit_writer.bytes_written();
+
+ // Write the result to the output stream
+ dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
}
};
template <int TYPE>
-inline size_t PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
- uint8_t* dst) {
- size_t nbytes = num_values * sizeof(T);
- memcpy(dst, buffer, nbytes);
- return nbytes;
+inline void PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
+ OutputStream* dst) {
+ dst->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T));
}
template <>
-inline size_t PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
- int num_values, uint8_t* dst) {
+inline void PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
+ int num_values, OutputStream* dst) {
ParquetException::NYI("byte array encoding");
- return 0;
}
template <>
-inline size_t PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
- const FixedLenByteArray* src, int num_values, uint8_t* dst) {
+inline void PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
+ const FixedLenByteArray* src, int num_values, OutputStream* dst) {
ParquetException::NYI("FLBA encoding");
- return 0;
}
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 84a32f3..7030d0e 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -29,6 +29,8 @@
#include "parquet/exception.h"
#include "parquet/reader.h"
#include "parquet/column/reader.h"
-#include "parquet/util/input_stream.h"
+
+#include "parquet/util/input.h"
+#include "parquet/util/output.h"
#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index ffc882c..8da8b99 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -25,6 +25,7 @@
#include "parquet/reader.h"
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
+#include "parquet/util/input.h"
using std::string;
@@ -47,7 +48,7 @@ class TestAllTypesPlain : public ::testing::Test {
void TearDown() {}
protected:
- LocalFile file_;
+ LocalFileSource file_;
ParquetFileReader reader_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index 2f30ebf..3fcce90 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -31,7 +31,6 @@
#include "parquet/exception.h"
#include "parquet/schema/converter.h"
#include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
using std::string;
using std::vector;
@@ -39,48 +38,6 @@ using std::vector;
namespace parquet_cpp {
// ----------------------------------------------------------------------
-// LocalFile methods
-
-LocalFile::~LocalFile() {
- CloseFile();
-}
-
-void LocalFile::Open(const std::string& path) {
- path_ = path;
- file_ = fopen(path_.c_str(), "r");
- is_open_ = true;
-}
-
-void LocalFile::Close() {
- // Pure virtual
- CloseFile();
-}
-
-void LocalFile::CloseFile() {
- if (is_open_) {
- fclose(file_);
- is_open_ = false;
- }
-}
-
-size_t LocalFile::Size() {
- fseek(file_, 0L, SEEK_END);
- return Tell();
-}
-
-void LocalFile::Seek(size_t pos) {
- fseek(file_, pos, SEEK_SET);
-}
-
-size_t LocalFile::Tell() {
- return ftell(file_);
-}
-
-size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) {
- return fread(buffer, 1, nbytes, file_);
-}
-
-// ----------------------------------------------------------------------
// RowGroupReader
std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
@@ -102,7 +59,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
std::unique_ptr<InputStream> input(
new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
- FileLike* source = this->parent_->buffer_;
+ RandomAccessSource* source = this->parent_->buffer_;
source->Seek(col_start);
@@ -141,7 +98,7 @@ ParquetFileReader::ParquetFileReader() :
ParquetFileReader::~ParquetFileReader() {}
-void ParquetFileReader::Open(FileLike* buffer) {
+void ParquetFileReader::Open(RandomAccessSource* buffer) {
buffer_ = buffer;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
index ea23182..3a9dc5d 100644
--- a/src/parquet/reader.h
+++ b/src/parquet/reader.h
@@ -27,53 +27,12 @@
#include "parquet/thrift/parquet_types.h"
#include "parquet/types.h"
-
#include "parquet/schema/descriptor.h"
+#include "parquet/util/input.h"
namespace parquet_cpp {
class ColumnReader;
-
-class FileLike {
- public:
- virtual ~FileLike() {}
-
- virtual void Close() = 0;
- virtual size_t Size() = 0;
- virtual size_t Tell() = 0;
- virtual void Seek(size_t pos) = 0;
-
- // Returns actual number of bytes read
- virtual size_t Read(size_t nbytes, uint8_t* out) = 0;
-};
-
-
-class LocalFile : public FileLike {
- public:
- LocalFile() : file_(nullptr), is_open_(false) {}
- virtual ~LocalFile();
-
- void Open(const std::string& path);
-
- virtual void Close();
- virtual size_t Size();
- virtual size_t Tell();
- virtual void Seek(size_t pos);
-
- // Returns actual number of bytes read
- virtual size_t Read(size_t nbytes, uint8_t* out);
-
- bool is_open() const { return is_open_;}
- const std::string& path() const { return path_;}
-
- private:
- void CloseFile();
-
- std::string path_;
- FILE* file_;
- bool is_open_;
-};
-
class ParquetFileReader;
class RowGroupReader {
@@ -112,7 +71,7 @@ class ParquetFileReader {
// This class does _not_ take ownership of the file. You must manage its
// lifetime separately
- void Open(FileLike* buffer);
+ void Open(RandomAccessSource* buffer);
void Close();
@@ -150,7 +109,7 @@ class ParquetFileReader {
// Row group index -> RowGroupReader
std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
- FileLike* buffer_;
+ RandomAccessSource* buffer_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 046a7c9..504069f 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -27,11 +27,13 @@ install(FILES
macros.h
rle-encoding.h
stopwatch.h
- input_stream.h
+ input.h
+ output.h
DESTINATION include/parquet/util)
add_library(parquet_util STATIC
- input_stream.cc
+ input.cc
+ output.cc
cpu-info.cc
)
@@ -54,4 +56,5 @@ if(PARQUET_BUILD_TESTS)
endif()
ADD_PARQUET_TEST(bit-util-test)
+ADD_PARQUET_TEST(output-test)
ADD_PARQUET_TEST(rle-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
new file mode 100644
index 0000000..0e4b833
--- /dev/null
+++ b/src/parquet/util/input.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/input.h"
+
+#include <algorithm>
+#include <string>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// LocalFileSource
+
+LocalFileSource::~LocalFileSource() {
+ CloseFile();
+}
+
+void LocalFileSource::Open(const std::string& path) {
+ path_ = path;
+ file_ = fopen(path_.c_str(), "r");
+ is_open_ = true;
+}
+
+void LocalFileSource::Close() {
+ // Pure virtual
+ CloseFile();
+}
+
+void LocalFileSource::CloseFile() {
+ if (is_open_) {
+ fclose(file_);
+ is_open_ = false;
+ }
+}
+
+size_t LocalFileSource::Size() {
+ fseek(file_, 0L, SEEK_END);
+ return Tell();
+}
+
+void LocalFileSource::Seek(size_t pos) {
+ fseek(file_, pos, SEEK_SET);
+}
+
+size_t LocalFileSource::Tell() {
+ return ftell(file_);
+}
+
+size_t LocalFileSource::Read(size_t nbytes, uint8_t* buffer) {
+ return fread(buffer, 1, nbytes, file_);
+}
+
+// ----------------------------------------------------------------------
+// InMemoryInputStream
+
+InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
+ buffer_(buffer), len_(len), offset_(0) {}
+
+const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+ *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
+ return buffer_ + offset_;
+}
+
+const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+ const uint8_t* result = Peek(num_to_read, num_bytes);
+ offset_ += *num_bytes;
+ return result;
+}
+
+// ----------------------------------------------------------------------
+// ScopedInMemoryInputStream:: like InMemoryInputStream but owns its memory
+
+ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
+ buffer_.resize(len);
+ stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
+}
+
+uint8_t* ScopedInMemoryInputStream::data() {
+ return buffer_.data();
+}
+
+int64_t ScopedInMemoryInputStream::size() {
+ return buffer_.size();
+}
+
+const uint8_t* ScopedInMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+ return stream_->Peek(num_to_peek, num_bytes);
+}
+
+const uint8_t* ScopedInMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+ return stream_->Read(num_to_read, num_bytes);
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
new file mode 100644
index 0000000..4fd9cd7
--- /dev/null
+++ b/src/parquet/util/input.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_INPUT_H
+#define PARQUET_UTIL_INPUT_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// Random access input (e.g. file-like)
+
+// Random
+class RandomAccessSource {
+ public:
+ virtual ~RandomAccessSource() {}
+
+ virtual void Close() = 0;
+ virtual size_t Size() = 0;
+ virtual size_t Tell() = 0;
+ virtual void Seek(size_t pos) = 0;
+
+ // Returns actual number of bytes read
+ virtual size_t Read(size_t nbytes, uint8_t* out) = 0;
+};
+
+
+class LocalFileSource : public RandomAccessSource {
+ public:
+ LocalFileSource() : file_(nullptr), is_open_(false) {}
+ virtual ~LocalFileSource();
+
+ void Open(const std::string& path);
+
+ virtual void Close();
+ virtual size_t Size();
+ virtual size_t Tell();
+ virtual void Seek(size_t pos);
+
+ // Returns actual number of bytes read
+ virtual size_t Read(size_t nbytes, uint8_t* out);
+
+ bool is_open() const { return is_open_;}
+ const std::string& path() const { return path_;}
+
+ private:
+ void CloseFile();
+
+ std::string path_;
+ FILE* file_;
+ bool is_open_;
+};
+
+// ----------------------------------------------------------------------
+// Streaming input interfaces
+
+// Interface for the column reader to get the bytes. The interface is a stream
+// interface, meaning the bytes in order and once a byte is read, it does not
+// need to be read again.
+class InputStream {
+ public:
+ // Returns the next 'num_to_peek' without advancing the current position.
+ // *num_bytes will contain the number of bytes returned which can only be
+ // less than num_to_peek at end of stream cases.
+ // Since the position is not advanced, calls to this function are idempotent.
+ // The buffer returned to the caller is still owned by the input stream and must
+ // stay valid until the next call to Peek() or Read().
+ virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
+
+ // Identical to Peek(), except the current position in the stream is advanced by
+ // *num_bytes.
+ virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
+
+ virtual ~InputStream() {}
+
+ protected:
+ InputStream() {}
+};
+
+// Implementation of an InputStream when all the bytes are in memory.
+class InMemoryInputStream : public InputStream {
+ public:
+ InMemoryInputStream(const uint8_t* buffer, int64_t len);
+ virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+ virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+ private:
+ const uint8_t* buffer_;
+ int64_t len_;
+ int64_t offset_;
+};
+
+
+// A wrapper for InMemoryInputStream to manage the memory.
+class ScopedInMemoryInputStream : public InputStream {
+ public:
+ explicit ScopedInMemoryInputStream(int64_t len);
+ uint8_t* data();
+ int64_t size();
+ virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+ virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+ private:
+ std::vector<uint8_t> buffer_;
+ std::unique_ptr<InMemoryInputStream> stream_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_INPUT_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input_stream.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input_stream.cc b/src/parquet/util/input_stream.cc
deleted file mode 100644
index 281a342..0000000
--- a/src/parquet/util/input_stream.cc
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/input_stream.h"
-
-#include <algorithm>
-
-#include "parquet/exception.h"
-
-namespace parquet_cpp {
-
-InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
- buffer_(buffer), len_(len), offset_(0) {}
-
-const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
- *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
- return buffer_ + offset_;
-}
-
-const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
- const uint8_t* result = Peek(num_to_read, num_bytes);
- offset_ += *num_bytes;
- return result;
-}
-
-ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
- buffer_.resize(len);
- stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
-}
-
-uint8_t* ScopedInMemoryInputStream::data() {
- return buffer_.data();
-}
-
-int64_t ScopedInMemoryInputStream::size() {
- return buffer_.size();
-}
-
-const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek,
- int* num_bytes) {
- return stream_->Peek(num_to_peek, num_bytes);
-}
-
-const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read,
- int* num_bytes) {
- return stream_->Read(num_to_read, num_bytes);
-}
-
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input_stream.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input_stream.h b/src/parquet/util/input_stream.h
deleted file mode 100644
index ece2488..0000000
--- a/src/parquet/util/input_stream.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_INPUT_STREAM_H
-#define PARQUET_INPUT_STREAM_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-namespace parquet_cpp {
-
-// Interface for the column reader to get the bytes. The interface is a stream
-// interface, meaning the bytes in order and once a byte is read, it does not
-// need to be read again.
-class InputStream {
- public:
- // Returns the next 'num_to_peek' without advancing the current position.
- // *num_bytes will contain the number of bytes returned which can only be
- // less than num_to_peek at end of stream cases.
- // Since the position is not advanced, calls to this function are idempotent.
- // The buffer returned to the caller is still owned by the input stream and must
- // stay valid until the next call to Peek() or Read().
- virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0;
-
- // Identical to Peek(), except the current position in the stream is advanced by
- // *num_bytes.
- virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0;
-
- virtual ~InputStream() {}
-
- protected:
- InputStream() {}
-};
-
-// Implementation of an InputStream when all the bytes are in memory.
-class InMemoryInputStream : public InputStream {
- public:
- InMemoryInputStream(const uint8_t* buffer, int64_t len);
- virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
- virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
- const uint8_t* buffer_;
- int64_t len_;
- int64_t offset_;
-};
-
-
-// A wrapper for InMemoryInputStream to manage the memory.
-class ScopedInMemoryInputStream : public InputStream {
- public:
- explicit ScopedInMemoryInputStream(int64_t len);
- uint8_t* data();
- int64_t size();
- virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
- virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
- std::vector<uint8_t> buffer_;
- std::unique_ptr<InMemoryInputStream> stream_;
-};
-
-} // namespace parquet_cpp
-
-#endif // PARQUET_INPUT_STREAM_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc
new file mode 100644
index 0000000..84f5b57
--- /dev/null
+++ b/src/parquet/util/output-test.cc
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet_cpp {
+
+TEST(TestInMemoryOutputStream, Basics) {
+ std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
+
+ std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+
+ stream->Write(&data[0], 4);
+ ASSERT_EQ(4, stream->Tell());
+ stream->Write(&data[4], data.size() - 4);
+
+ std::vector<uint8_t> out;
+ stream->Transfer(&out);
+
+ test::assert_vector_equal(data, out);
+
+ ASSERT_EQ(0, stream->Tell());
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
new file mode 100644
index 0000000..9748a69
--- /dev/null
+++ b/src/parquet/util/output.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/output.h"
+
+#include <algorithm>
+#include <cstring>
+#include <sstream>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// In-memory output stream
+
+static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024;
+
+InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) :
+ size_(0),
+ capacity_(initial_capacity) {
+ if (initial_capacity == 0) {
+ initial_capacity = IN_MEMORY_DEFAULT_CAPACITY;
+ }
+ buffer_.resize(initial_capacity);
+}
+
+InMemoryOutputStream::InMemoryOutputStream() :
+ InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {}
+
+uint8_t* InMemoryOutputStream::Head() {
+ return &buffer_[size_];
+}
+
+void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
+ if (size_ + length > capacity_) {
+ int64_t new_capacity = capacity_ * 2;
+ while (new_capacity < size_ + length) {
+ new_capacity *= 2;
+ }
+ buffer_.resize(new_capacity);
+ capacity_ = new_capacity;
+ }
+ memcpy(Head(), data, length);
+ size_ += length;
+}
+
+int64_t InMemoryOutputStream::Tell() {
+ return size_;
+}
+
+void InMemoryOutputStream::Transfer(std::vector<uint8_t>* out) {
+ buffer_.resize(size_);
+ buffer_.swap(*out);
+ size_ = 0;
+ capacity_ = buffer_.size();
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
new file mode 100644
index 0000000..e83b261
--- /dev/null
+++ b/src/parquet/util/output.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_OUTPUT_H
+#define PARQUET_UTIL_OUTPUT_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// Output stream classes
+
+// Abstract output stream
+class OutputStream {
+ public:
+ // Close the output stream
+ virtual void Close() = 0;
+
+ // Return the current position in the output stream relative to the start
+ virtual int64_t Tell() = 0;
+
+ // Copy bytes into the output stream
+ virtual void Write(const uint8_t* data, int64_t length) = 0;
+};
+
+
+// An output stream that is an in-memory
+class InMemoryOutputStream : public OutputStream {
+ public:
+ InMemoryOutputStream();
+ explicit InMemoryOutputStream(int64_t initial_capacity);
+
+ // Close is currently a no-op with the in-memory stream
+ virtual void Close() {}
+
+ virtual int64_t Tell();
+
+ virtual void Write(const uint8_t* data, int64_t length);
+
+ // Hand off the in-memory data to a (preferably-empty) std::vector owner
+ void Transfer(std::vector<uint8_t>* out);
+
+ private:
+ // Mutable pointer to the current write position in the stream
+ uint8_t* Head();
+
+ std::vector<uint8_t> buffer_;
+ int64_t size_;
+ int64_t capacity_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_OUTPUT_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index 3cf82f5..84519d6 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -29,6 +29,16 @@ namespace parquet_cpp {
namespace test {
template <typename T>
+static inline void assert_vector_equal(const vector<T>& left,
+ const vector<T>& right) {
+ ASSERT_EQ(left.size(), right.size());
+
+ for (size_t i = 0; i < left.size(); ++i) {
+ ASSERT_EQ(left[i], right[i]) << i;
+ }
+}
+
+template <typename T>
static inline bool vector_equal(const vector<T>& left, const vector<T>& right) {
if (left.size() != right.size()) {
return false;
@@ -47,6 +57,19 @@ static inline bool vector_equal(const vector<T>& left, const vector<T>& right) {
return true;
}
+template <typename T>
+static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
+ if (end < start) {
+ return vector<T>(0);
+ }
+
+ vector<T> out(end - start);
+ for (size_t i = start; i < end; ++i) {
+ out[i - start] = values[i];
+ }
+ return out;
+}
+
static inline vector<bool> flip_coins_seed(size_t n, double p, uint32_t seed) {
std::mt19937 gen(seed);
std::bernoulli_distribution d(p);