You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by no...@apache.org on 2016/02/02 19:50:19 UTC
parquet-cpp git commit: PARQUET-485: Decouple page deserialization
from column reader to facilitate unit testing
Repository: parquet-cpp
Updated Branches:
refs/heads/master 2b935ae96 -> 08088af76
PARQUET-485: Decouple page deserialization from column reader to facilitate unit testing
Several things in this patch
* Adds PageReader abstraction, and a SerializedPageReader implementation
according to the Parquet file format
* Adds a MockPageReader and a couple unit tests demonstrating end-to-end test
without creating a Parquet file
* Adds a DataPageBuilder test fixture tool, may become part of the main write
path later
* Adds PlainEncoder implementation for a few primitive types
* Fixes a few ColumnReader bugs exposed by the unit tests
Author: Wes McKinney <we...@cloudera.com>
Closes #32 from wesm/PARQUET-485 and squashes the following commits:
aa33078 [Wes McKinney] Fix function doc
e897a81 [Wes McKinney] Restore NumRequiredBits function after rebase
ee4d97a [Wes McKinney] Change PageReader::NextPage API to return shared_ptr<Page>(nullptr) on eos
0324021 [Wes McKinney] Clarify some comments
ec871c4 [Wes McKinney] Add include guards
e63bbdd [Wes McKinney] Move vector_equal to util/test-common.h
44a78a1 [Wes McKinney] Refactor to decouple page deserialization from column reader so that mock data pages cna be constructed in unit tests.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/08088af7
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/08088af7
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/08088af7
Branch: refs/heads/master
Commit: 08088af76ec2357318e045f0696901e2e6e79fbf
Parents: 2b935ae
Author: Wes McKinney <we...@cloudera.com>
Authored: Tue Feb 2 10:50:17 2016 -0800
Committer: Nong Li <no...@gmail.com>
Committed: Tue Feb 2 10:50:17 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 2 +
src/parquet/column/CMakeLists.txt | 4 +
src/parquet/column/column-reader-test.cc | 165 +++++++++++++++++++++++
src/parquet/column/page.h | 132 ++++++++++++++++++
src/parquet/column/reader.cc | 164 +++++++++++------------
src/parquet/column/reader.h | 80 +++++------
src/parquet/column/serialized-page.cc | 103 ++++++++++++++
src/parquet/column/serialized-page.h | 61 +++++++++
src/parquet/column/test-util.h | 184 ++++++++++++++++++++++++++
src/parquet/encodings/encodings.h | 34 +++++
src/parquet/encodings/plain-encoding.h | 56 +++++++-
src/parquet/reader.cc | 13 +-
src/parquet/reader.h | 4 +
src/parquet/util/bit-util.h | 8 ++
src/parquet/util/test-common.h | 53 ++++++++
15 files changed, 924 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 173d676..d379e3d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -214,6 +214,7 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS})
# Library config
set(LIBPARQUET_SRCS
+ src/parquet/column/serialized-page.cc
src/parquet/column/reader.cc
src/parquet/column/scanner.cc
src/parquet/reader.cc
@@ -246,6 +247,7 @@ if(APPLE)
endif()
add_subdirectory(src/parquet)
+add_subdirectory(src/parquet/column)
add_subdirectory(src/parquet/compression)
add_subdirectory(src/parquet/encodings)
add_subdirectory(src/parquet/thrift)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index 20f6167..7eb334e 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -17,6 +17,10 @@
# Headers: top level
install(FILES
+ page.h
reader.h
+ serialized-page.h
scanner.h
DESTINATION include/parquet/column)
+
+ADD_PARQUET_TEST(column-reader-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
new file mode 100644
index 0000000..88f4465
--- /dev/null
+++ b/src/parquet/column/column-reader-test.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "parquet/types.h"
+#include "parquet/column/page.h"
+#include "parquet/column/reader.h"
+#include "parquet/column/test-util.h"
+
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+using parquet::FieldRepetitionType;
+using parquet::SchemaElement;
+using parquet::Encoding;
+using parquet::Type;
+
+namespace parquet_cpp {
+
+namespace test {
+
+class TestPrimitiveReader : public ::testing::Test {
+ public:
+ void SetUp() {}
+
+ void TearDown() {}
+
+ void InitReader(const SchemaElement* element) {
+ pager_.reset(new test::MockPageReader(pages_));
+ reader_ = ColumnReader::Make(element, std::move(pager_));
+ }
+
+ protected:
+ std::shared_ptr<ColumnReader> reader_;
+ std::unique_ptr<PageReader> pager_;
+ vector<shared_ptr<Page> > pages_;
+};
+
+template <typename T>
+static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
+ if (end < start) {
+ return vector<T>(0);
+ }
+
+ vector<T> out(end - start);
+ for (size_t i = start; i < end; ++i) {
+ out[i - start] = values[i];
+ }
+ return out;
+}
+
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
+ vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+ size_t num_values = values.size();
+ Encoding::type value_encoding = Encoding::PLAIN;
+
+ vector<uint8_t> page1;
+ test::DataPageBuilder<Type::INT32> page_builder(&page1);
+ page_builder.AppendValues(values, Encoding::PLAIN);
+ pages_.push_back(page_builder.Finish());
+
+ // TODO: simplify this
+ SchemaElement element;
+ element.__set_type(Type::INT32);
+ element.__set_repetition_type(FieldRepetitionType::REQUIRED);
+ InitReader(&element);
+
+ Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+
+ vector<int32_t> result(10, -1);
+
+ size_t values_read = 0;
+ size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr,
+ &result[0], &values_read);
+ ASSERT_EQ(10, batch_actual);
+ ASSERT_EQ(10, values_read);
+
+ ASSERT_TRUE(vector_equal(result, values));
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
+ vector<int32_t> values = {1, 2, 3, 4, 5};
+ vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
+
+ size_t num_values = values.size();
+ Encoding::type value_encoding = Encoding::PLAIN;
+
+ vector<uint8_t> page1;
+ test::DataPageBuilder<Type::INT32> page_builder(&page1);
+
+ // Definition levels precede the values
+ page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE);
+ page_builder.AppendValues(values, Encoding::PLAIN);
+
+ pages_.push_back(page_builder.Finish());
+
+ // TODO: simplify this
+ SchemaElement element;
+ element.__set_type(Type::INT32);
+ element.__set_repetition_type(FieldRepetitionType::OPTIONAL);
+ InitReader(&element);
+
+ Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+
+ std::vector<int32_t> vexpected;
+ std::vector<int16_t> dexpected;
+
+ size_t values_read = 0;
+ size_t batch_actual = 0;
+
+ vector<int32_t> vresult(3, -1);
+ vector<int16_t> dresult(5, -1);
+
+ batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
+ &vresult[0], &values_read);
+ ASSERT_EQ(5, batch_actual);
+ ASSERT_EQ(3, values_read);
+
+ ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
+ ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
+
+ batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
+ &vresult[0], &values_read);
+ ASSERT_EQ(5, batch_actual);
+ ASSERT_EQ(2, values_read);
+
+ ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
+ ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
+
+ // EOS, pass all nullptrs to check for improper writes. Do not segfault /
+ // core dump
+ batch_actual = reader->ReadBatch(5, nullptr, nullptr,
+ nullptr, &values_read);
+ ASSERT_EQ(0, batch_actual);
+ ASSERT_EQ(0, values_read);
+}
+
+} // namespace test
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
new file mode 100644
index 0000000..46f5d62
--- /dev/null
+++ b/src/parquet/column/page.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_PAGE_H
+#define PARQUET_COLUMN_PAGE_H
+
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+// Note: Copying the specific page header Thrift metadata to the Page object
+// (instead of using a pointer) presently so that data pages can be
+// decompressed and processed in parallel. We can turn the header members of
+// these classes into pointers at some point, but the downside is that
+// applications materializing multiple data pages at once will have to have a
+// data container that manages the lifetime of the deserialized
+// parquet::PageHeader structs.
+//
+// TODO: Parallel processing is not yet safe because of memory-ownership
+// semantics (the PageReader may or may not own the memory referenced by a
+// page)
+class Page {
+ // TODO(wesm): In the future Parquet implementations may store the crc code
+ // in parquet::PageHeader. parquet-mr currently does not, so we also skip it
+ // here, both on the read and write path
+ public:
+ Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) :
+ buffer_(buffer),
+ buffer_size_(buffer_size),
+ type_(type) {}
+
+ parquet::PageType::type type() const {
+ return type_;
+ }
+
+ // @returns: a pointer to the page's data
+ const uint8_t* data() const {
+ return buffer_;
+ }
+
+ // @returns: the total size in bytes of the page's data buffer
+ size_t size() const {
+ return buffer_size_;
+ }
+
+ private:
+ const uint8_t* buffer_;
+ size_t buffer_size_;
+
+ parquet::PageType::type type_;
+};
+
+
+class DataPage : public Page {
+ public:
+ DataPage(const uint8_t* buffer, size_t buffer_size,
+ const parquet::DataPageHeader& header) :
+ Page(buffer, buffer_size, parquet::PageType::DATA_PAGE),
+ header_(header) {}
+
+ size_t num_values() const {
+ return header_.num_values;
+ }
+
+ parquet::Encoding::type encoding() const {
+ return header_.encoding;
+ }
+
+ private:
+ parquet::DataPageHeader header_;
+};
+
+
+class DataPageV2 : public Page {
+ public:
+ DataPageV2(const uint8_t* buffer, size_t buffer_size,
+ const parquet::DataPageHeaderV2& header) :
+ Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2),
+ header_(header) {}
+
+ private:
+ parquet::DataPageHeaderV2 header_;
+};
+
+
+class DictionaryPage : public Page {
+ public:
+ DictionaryPage(const uint8_t* buffer, size_t buffer_size,
+ const parquet::DictionaryPageHeader& header) :
+ Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE),
+ header_(header) {}
+
+ size_t num_values() const {
+ return header_.num_values;
+ }
+
+ private:
+ parquet::DictionaryPageHeader header_;
+};
+
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PageReader {
+ public:
+ virtual ~PageReader() {}
+
+ // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+ // containing new Page otherwise
+ virtual std::shared_ptr<Page> NextPage() = 0;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_PAGE_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index edfea49..91e026a 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -18,45 +18,62 @@
#include "parquet/column/reader.h"
#include <algorithm>
+#include <memory>
#include <string>
#include <string.h>
-#include "parquet/compression/codec.h"
-#include "parquet/encodings/encodings.h"
-#include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
+#include "parquet/column/page.h"
-const int DATA_PAGE_SIZE = 64 * 1024;
+#include "parquet/encodings/encodings.h"
namespace parquet_cpp {
-using parquet::CompressionCodec;
using parquet::Encoding;
using parquet::FieldRepetitionType;
using parquet::PageType;
using parquet::Type;
-ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
- const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream)
- : metadata_(metadata),
- schema_(schema),
- stream_(std::move(stream)),
+ColumnReader::ColumnReader(const parquet::SchemaElement* schema,
+ std::unique_ptr<PageReader> pager)
+ : schema_(schema),
+ pager_(std::move(pager)),
num_buffered_values_(0),
- num_decoded_values_(0) {
- switch (metadata->codec) {
- case CompressionCodec::UNCOMPRESSED:
- break;
- case CompressionCodec::SNAPPY:
- decompressor_.reset(new SnappyCodec());
- break;
- default:
- ParquetException::NYI("Reading compressed data");
+ num_decoded_values_(0) {}
+
+template <int TYPE>
+void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
+ auto it = decoders_.find(Encoding::RLE_DICTIONARY);
+ if (it != decoders_.end()) {
+ throw ParquetException("Column cannot have more than one dictionary.");
}
- config_ = Config::DefaultConfig();
+ PlainDecoder<TYPE> dictionary(schema_);
+ dictionary.SetData(page->num_values(), page->data(), page->size());
+
+ // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+ // DictionaryPage buffer is no longer required after this step
+ //
+ // TODO(wesm): investigate whether this all-or-nothing decoding of the
+ // dictionary makes sense and whether performance can be improved
+ std::shared_ptr<DecoderType> decoder(
+ new DictionaryDecoder<TYPE>(schema_, &dictionary));
+
+ decoders_[Encoding::RLE_DICTIONARY] = decoder;
+ current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
}
+static size_t InitializeLevelDecoder(const uint8_t* buffer,
+ int16_t max_level, std::unique_ptr<RleDecoder>& decoder) {
+ int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
+
+ decoder.reset(new RleDecoder(buffer + sizeof(uint32_t),
+ num_definition_bytes,
+ BitUtil::NumRequiredBits(max_level)));
+
+ return sizeof(uint32_t) + num_definition_bytes;
+}
+
// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
@@ -66,68 +83,44 @@ static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
template <int TYPE>
bool TypedColumnReader<TYPE>::ReadNewPage() {
// Loop until we find the next data page.
+ const uint8_t* buffer;
while (true) {
- int bytes_read = 0;
- const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
- if (bytes_read == 0) return false;
- uint32_t header_size = bytes_read;
- DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
- stream_->Read(header_size, &bytes_read);
-
- int compressed_len = current_page_header_.compressed_page_size;
- int uncompressed_len = current_page_header_.uncompressed_page_size;
-
- // Read the compressed data page.
- buffer = stream_->Read(compressed_len, &bytes_read);
- if (bytes_read != compressed_len) ParquetException::EofException();
-
- // Uncompress it if we need to
- if (decompressor_ != NULL) {
- // Grow the uncompressed buffer if we need to.
- if (uncompressed_len > decompression_buffer_.size()) {
- decompression_buffer_.resize(uncompressed_len);
- }
- decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
- &decompression_buffer_[0]);
- buffer = &decompression_buffer_[0];
+ current_page_ = pager_->NextPage();
+ if (!current_page_) {
+ // EOS
+ return false;
}
- if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
- auto it = decoders_.find(Encoding::RLE_DICTIONARY);
- if (it != decoders_.end()) {
- throw ParquetException("Column cannot have more than one dictionary.");
- }
-
- PlainDecoder<TYPE> dictionary(schema_);
- dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
- buffer, uncompressed_len);
- std::shared_ptr<DecoderType> decoder(
- new DictionaryDecoder<TYPE>(schema_, &dictionary));
-
- decoders_[Encoding::RLE_DICTIONARY] = decoder;
- current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
+ if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+ ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
- } else if (current_page_header_.type == PageType::DATA_PAGE) {
+ } else if (current_page_->type() == PageType::DATA_PAGE) {
+ const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
// Read a data page.
- num_buffered_values_ = current_page_header_.data_page_header.num_values;
+ num_buffered_values_ = page->num_values();
// Have not decoded any values from the data page yet
num_decoded_values_ = 0;
+ buffer = page->data();
+
+ // If the data page includes repetition and definition levels, we
+ // initialize the level decoder and subtract the encoded level bytes from
+ // the page size to determine the number of bytes in the encoded data.
+ size_t data_size = page->size();
+
// Read definition levels.
if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
- int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
-
- // Temporary hack until schema resolution
+ // Temporary hack until schema resolution implemented
max_definition_level_ = 1;
- buffer += sizeof(uint32_t);
- definition_level_decoder_.reset(
- new RleDecoder(buffer, num_definition_bytes, 1));
- buffer += num_definition_bytes;
- uncompressed_len -= sizeof(uint32_t);
- uncompressed_len -= num_definition_bytes;
+ size_t def_levels_bytes = InitializeLevelDecoder(buffer,
+ max_definition_level_, definition_level_decoder_);
+
+ buffer += def_levels_bytes;
+ data_size -= def_levels_bytes;
} else {
// REQUIRED field
max_definition_level_ = 0;
@@ -137,7 +130,8 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
- Encoding::type encoding = current_page_header_.data_page_header.encoding;
+ Encoding::type encoding = page->encoding();
+
if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
auto it = decoders_.find(encoding);
@@ -163,10 +157,11 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
throw ParquetException("Unknown encoding type.");
}
}
- current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
+ current_decoder_->SetData(num_buffered_values_, buffer, data_size);
return true;
} else {
- // We don't know what this page type is. We're allowed to skip non-data pages.
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
continue;
}
}
@@ -206,27 +201,26 @@ size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
// ----------------------------------------------------------------------
// Dynamic column reader constructor
-std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
- const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) {
- switch (metadata->type) {
+std::shared_ptr<ColumnReader> ColumnReader::Make(
+ const parquet::SchemaElement* element,
+ std::unique_ptr<PageReader> pager) {
+ switch (element->type) {
case Type::BOOLEAN:
- return std::make_shared<BoolReader>(metadata, element, std::move(stream));
+ return std::make_shared<BoolReader>(element, std::move(pager));
case Type::INT32:
- return std::make_shared<Int32Reader>(metadata, element, std::move(stream));
+ return std::make_shared<Int32Reader>(element, std::move(pager));
case Type::INT64:
- return std::make_shared<Int64Reader>(metadata, element, std::move(stream));
+ return std::make_shared<Int64Reader>(element, std::move(pager));
case Type::INT96:
- return std::make_shared<Int96Reader>(metadata, element, std::move(stream));
+ return std::make_shared<Int96Reader>(element, std::move(pager));
case Type::FLOAT:
- return std::make_shared<FloatReader>(metadata, element, std::move(stream));
+ return std::make_shared<FloatReader>(element, std::move(pager));
case Type::DOUBLE:
- return std::make_shared<DoubleReader>(metadata, element, std::move(stream));
+ return std::make_shared<DoubleReader>(element, std::move(pager));
case Type::BYTE_ARRAY:
- return std::make_shared<ByteArrayReader>(metadata, element,
- std::move(stream));
+ return std::make_shared<ByteArrayReader>(element, std::move(pager));
case Type::FIXED_LEN_BYTE_ARRAY:
- return std::make_shared<FixedLenByteArrayReader>(metadata, element,
- std::move(stream));
+ return std::make_shared<FixedLenByteArrayReader>(element, std::move(pager));
default:
ParquetException::NYI("type reader not implemented");
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 8f857c4..27ff678 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -28,9 +28,11 @@
#include "parquet/exception.h"
#include "parquet/types.h"
+
+#include "parquet/column/page.h"
+
#include "parquet/thrift/parquet_constants.h"
#include "parquet/thrift/parquet_types.h"
-#include "parquet/util/input_stream.h"
#include "parquet/encodings/encodings.h"
#include "parquet/util/rle-encoding.h"
@@ -52,21 +54,10 @@ class Scanner;
class ColumnReader {
public:
- struct Config {
- int batch_size;
-
- static Config DefaultConfig() {
- Config config;
- config.batch_size = 128;
- return config;
- }
- };
-
- ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*,
- std::unique_ptr<InputStream> stream);
+ ColumnReader(const parquet::SchemaElement*, std::unique_ptr<PageReader>);
- static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
- const parquet::SchemaElement*, std::unique_ptr<InputStream> stream);
+ static std::shared_ptr<ColumnReader> Make(const parquet::SchemaElement*,
+ std::unique_ptr<PageReader>);
// Returns true if there are still values in this column.
bool HasNext() {
@@ -81,11 +72,7 @@ class ColumnReader {
}
parquet::Type::type type() const {
- return metadata_->type;
- }
-
- const parquet::ColumnMetaData* metadata() const {
- return metadata_;
+ return schema_->type;
}
const parquet::SchemaElement* schema() const {
@@ -105,17 +92,10 @@ class ColumnReader {
// Returns the number of decoded repetition levels
size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels);
- Config config_;
-
- const parquet::ColumnMetaData* metadata_;
const parquet::SchemaElement* schema_;
- std::unique_ptr<InputStream> stream_;
- // Compression codec to use.
- std::unique_ptr<Codec> decompressor_;
- std::vector<uint8_t> decompression_buffer_;
-
- parquet::PageHeader current_page_header_;
+ std::unique_ptr<PageReader> pager_;
+ std::shared_ptr<Page> current_page_;
// Not set if full schema for this field has no optional or repeated elements
std::unique_ptr<RleDecoder> definition_level_decoder_;
@@ -145,12 +125,10 @@ class TypedColumnReader : public ColumnReader {
public:
typedef typename type_traits<TYPE>::value_type T;
- TypedColumnReader(const parquet::ColumnMetaData* metadata,
- const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) :
- ColumnReader(metadata, schema, std::move(stream)),
+ TypedColumnReader(const parquet::SchemaElement* schema,
+ std::unique_ptr<PageReader> pager) :
+ ColumnReader(schema, std::move(pager)),
current_decoder_(NULL) {
- size_t value_byte_size = type_traits<TYPE>::value_byte_size;
- values_buffer_.resize(config_.batch_size * value_byte_size);
}
// Read a batch of repetition levels, definition levels, and values from the
@@ -181,18 +159,20 @@ class TypedColumnReader : public ColumnReader {
// @returns: the number of values read into the out buffer
size_t ReadValues(size_t batch_size, T* out);
- // Map of compression type to decompressor object.
+ // Map of encoding type to the respective decoder object. For example, a
+ // column chunk's data pages may include both dictionary-encoded and
+ // plain-encoded data.
std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
+ void ConfigureDictionary(const DictionaryPage* page);
+
DecoderType* current_decoder_;
- std::vector<uint8_t> values_buffer_;
};
template <int TYPE>
inline size_t TypedColumnReader<TYPE>::ReadValues(size_t batch_size, T* out) {
size_t num_decoded = current_decoder_->Decode(out, batch_size);
- num_decoded_values_ += num_decoded;
return num_decoded;
}
@@ -212,9 +192,22 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
size_t num_def_levels = 0;
size_t num_rep_levels = 0;
+ size_t values_to_read = 0;
+
// If the field is required and non-repeated, there are no definition levels
if (definition_level_decoder_) {
num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+
+ // TODO(wesm): this tallying of values-to-decode can be performed with better
+ // cache-efficiency if fused with the level decoding.
+ for (size_t i = 0; i < num_def_levels; ++i) {
+ if (def_levels[i] == max_definition_level_) {
+ ++values_to_read;
+ }
+ }
+ } else {
+ // Required field, read all values
+ values_to_read = batch_size;
}
// Not present for non-repeated fields
@@ -226,18 +219,11 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
}
}
- // TODO(wesm): this tallying of values-to-decode can be performed with better
- // cache-efficiency if fused with the level decoding.
- size_t values_to_read = 0;
- for (size_t i = 0; i < num_def_levels; ++i) {
- if (def_levels[i] == max_definition_level_) {
- ++values_to_read;
- }
- }
-
*values_read = ReadValues(values_to_read, values);
+ size_t total_values = std::max(num_def_levels, *values_read);
+ num_decoded_values_ += total_values;
- return num_def_levels;
+ return total_values;
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/serialized-page.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc
new file mode 100644
index 0000000..1cbaf4d
--- /dev/null
+++ b/src/parquet/column/serialized-page.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column/serialized-page.h"
+
+#include <memory>
+
+#include "parquet/exception.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input_stream.h"
+
+using parquet::PageType;
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// SerializedPageReader deserializes Thrift metadata and pages that have been
+// assembled in a serialized stream for storing in a Parquet files
+
+SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
+ parquet::CompressionCodec::type codec) :
+ stream_(std::move(stream)) {
+ switch (codec) {
+ case parquet::CompressionCodec::UNCOMPRESSED:
+ break;
+ case parquet::CompressionCodec::SNAPPY:
+ decompressor_.reset(new SnappyCodec());
+ break;
+ default:
+ ParquetException::NYI("Reading compressed data");
+ }
+}
+
+// TODO(wesm): this may differ from file to file
+static constexpr int DATA_PAGE_SIZE = 64 * 1024;
+
+std::shared_ptr<Page> SerializedPageReader::NextPage() {
+ // Loop here because there may be unhandled page types that we skip until
+ // finding a page that we do know what to do with
+ while (true) {
+ int bytes_read = 0;
+ const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
+ if (bytes_read == 0) {
+ return std::shared_ptr<Page>(nullptr);
+ }
+
+ // This gets used, then set by DeserializeThriftMsg
+ uint32_t header_size = bytes_read;
+ DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
+
+ // Advance the stream offset
+ stream_->Read(header_size, &bytes_read);
+
+ int compressed_len = current_page_header_.compressed_page_size;
+ int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+ // Read the compressed data page.
+ buffer = stream_->Read(compressed_len, &bytes_read);
+ if (bytes_read != compressed_len) ParquetException::EofException();
+
+ // Uncompress it if we need to
+ if (decompressor_ != NULL) {
+ // Grow the uncompressed buffer if we need to.
+ if (uncompressed_len > decompression_buffer_.size()) {
+ decompression_buffer_.resize(uncompressed_len);
+ }
+ decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+ &decompression_buffer_[0]);
+ buffer = &decompression_buffer_[0];
+ }
+
+ if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
+ return std::make_shared<DictionaryPage>(buffer, uncompressed_len,
+ current_page_header_.dictionary_page_header);
+ } else if (current_page_header_.type == PageType::DATA_PAGE) {
+ return std::make_shared<DataPage>(buffer, uncompressed_len,
+ current_page_header_.data_page_header);
+ } else if (current_page_header_.type == PageType::DATA_PAGE_V2) {
+ ParquetException::NYI("data page v2");
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
+ continue;
+ }
+ }
+ return std::shared_ptr<Page>(nullptr);
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/serialized-page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h
new file mode 100644
index 0000000..2735c3c
--- /dev/null
+++ b/src/parquet/column/serialized-page.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H
+#define PARQUET_COLUMN_SERIALIZED_PAGE_H
+
+#include <memory>
+#include <vector>
+
+#include "parquet/column/page.h"
+#include "parquet/compression/codec.h"
+#include "parquet/util/input_stream.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift parquet::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageReader : public PageReader {
+ public:
+ SerializedPageReader(std::unique_ptr<InputStream> stream,
+ parquet::CompressionCodec::type codec);
+
+ virtual ~SerializedPageReader() {}
+
+ // Implement the PageReader interface
+ virtual std::shared_ptr<Page> NextPage();
+
+ private:
+ std::unique_ptr<InputStream> stream_;
+
+ parquet::PageHeader current_page_header_;
+ std::shared_ptr<Page> current_page_;
+
+ // Compression codec to use.
+ std::unique_ptr<Codec> decompressor_;
+ std::vector<uint8_t> decompression_buffer_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
new file mode 100644
index 0000000..80f3fa1
--- /dev/null
+++ b/src/parquet/column/test-util.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_TEST_UTIL_H
+#define PARQUET_COLUMN_TEST_UTIL_H
+
+#include <algorithm>
+#include <memory>
+#include <vector>
+
+#include "parquet/column/page.h"
+
+using parquet::Encoding;
+
+namespace parquet_cpp {
+
+namespace test {
+
+class MockPageReader : public PageReader {
+ public:
+ explicit MockPageReader(const std::vector<std::shared_ptr<Page> >& pages) :
+ pages_(pages),
+ page_index_(0) {}
+
+ // Implement the PageReader interface
+ virtual std::shared_ptr<Page> NextPage() {
+ if (page_index_ == pages_.size()) {
+ // EOS to consumer
+ return std::shared_ptr<Page>(nullptr);
+ }
+ return pages_[page_index_++];
+ }
+
+ private:
+ std::vector<std::shared_ptr<Page> > pages_;
+ size_t page_index_;
+};
+
+// TODO(wesm): this is only used for testing for now
+
+static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
+static constexpr int INIT_BUFFER_SIZE = 1024;
+
+template <int TYPE>
+class DataPageBuilder {
+ public:
+ typedef typename type_traits<TYPE>::value_type T;
+
+ // The passed vector is the owner of the page's data
+ explicit DataPageBuilder(std::vector<uint8_t>* out) :
+ out_(out),
+ buffer_size_(0),
+ num_values_(0),
+ have_def_levels_(false),
+ have_rep_levels_(false),
+ have_values_(false) {
+ out_->resize(INIT_BUFFER_SIZE);
+ buffer_capacity_ = INIT_BUFFER_SIZE;
+ }
+
+ void AppendDefLevels(const std::vector<int16_t>& levels,
+ int16_t max_level, parquet::Encoding::type encoding) {
+ AppendLevels(levels, max_level, encoding);
+
+ num_values_ = std::max(levels.size(), num_values_);
+ header_.__set_definition_level_encoding(encoding);
+ have_def_levels_ = true;
+ }
+
+ void AppendRepLevels(const std::vector<int16_t>& levels,
+ int16_t max_level, parquet::Encoding::type encoding) {
+ AppendLevels(levels, max_level, encoding);
+
+ num_values_ = std::max(levels.size(), num_values_);
+ header_.__set_repetition_level_encoding(encoding);
+ have_rep_levels_ = true;
+ }
+
+ void AppendValues(const std::vector<T>& values,
+ parquet::Encoding::type encoding) {
+ if (encoding != Encoding::PLAIN) {
+ ParquetException::NYI("only plain encoding currently implemented");
+ }
+ size_t bytes_to_encode = values.size() * sizeof(T);
+ Reserve(bytes_to_encode);
+
+ PlainEncoder<TYPE> encoder(nullptr);
+ size_t nbytes = encoder.Encode(&values[0], values.size(), Head());
+ // In case for some reason it's fewer than bytes_to_encode
+ buffer_size_ += nbytes;
+
+ num_values_ = std::max(values.size(), num_values_);
+ header_.__set_encoding(encoding);
+ have_values_ = true;
+ }
+
+ std::shared_ptr<Page> Finish() {
+ if (!have_values_) {
+ throw ParquetException("A data page must at least contain values");
+ }
+ header_.__set_num_values(num_values_);
+ return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_);
+ }
+
+ private:
+ std::vector<uint8_t>* out_;
+
+ size_t buffer_size_;
+ size_t buffer_capacity_;
+
+ parquet::DataPageHeader header_;
+
+ size_t num_values_;
+
+ bool have_def_levels_;
+ bool have_rep_levels_;
+ bool have_values_;
+
+ void Reserve(size_t nbytes) {
+ while ((nbytes + buffer_size_) > buffer_capacity_) {
+ // TODO(wesm): limit to one reserve when this loop runs more than once
+ size_t new_capacity = 2 * buffer_capacity_;
+ out_->resize(new_capacity);
+ buffer_capacity_ = new_capacity;
+ }
+ }
+
+ uint8_t* Head() {
+ return &(*out_)[buffer_size_];
+ }
+
+ // Used internally for both repetition and definition levels
+ void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
+ parquet::Encoding::type encoding) {
+ if (encoding != Encoding::RLE) {
+ ParquetException::NYI("only rle encoding currently implemented");
+ }
+
+ // TODO: compute a more precise maximum size for the encoded levels
+ std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);
+
+ RleEncoder encoder(&encode_buffer[0], encode_buffer.size(),
+ BitUtil::NumRequiredBits(max_level));
+
+ // TODO(wesm): push down vector encoding
+ for (int16_t level : levels) {
+ if (!encoder.Put(level)) {
+ throw ParquetException("out of space");
+ }
+ }
+
+ uint32_t rle_bytes = encoder.Flush();
+ size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
+ Reserve(levels_footprint);
+
+ *reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
+ memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes);
+ buffer_size_ += levels_footprint;
+ }
+};
+
+} // namespace test
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_TEST_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
index b30146a..4fb3d9a 100644
--- a/src/parquet/encodings/encodings.h
+++ b/src/parquet/encodings/encodings.h
@@ -67,6 +67,40 @@ class Decoder {
int num_values_;
};
+
+// Base class for value encoders. Since encoders may or not have state (e.g.,
+// dictionary encoding) we use a class instance to maintain any state.
+//
+// TODO(wesm): Encode interface API is temporary
+template <int TYPE>
+class Encoder {
+ public:
+ typedef typename type_traits<TYPE>::value_type T;
+
+ virtual ~Encoder() {}
+
+ // TODO(wesm): use an output stream
+
+ // Subclasses should override the ones they support
+ //
+ // @returns: the number of bytes written to dst
+ virtual size_t Encode(const T* src, int num_values, uint8_t* dst) {
+ throw ParquetException("Encoder does not implement this type.");
+ return 0;
+ }
+
+ const parquet::Encoding::type encoding() const { return encoding_; }
+
+ protected:
+ explicit Encoder(const parquet::SchemaElement* schema,
+ const parquet::Encoding::type& encoding)
+ : schema_(schema), encoding_(encoding) {}
+
+ // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+ const parquet::SchemaElement* schema_;
+ const parquet::Encoding::type encoding_;
+};
+
} // namespace parquet_cpp
#include "parquet/encodings/plain-encoding.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index e8f8977..11e70c7 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -22,8 +22,13 @@
#include <algorithm>
+using parquet::Type;
+
namespace parquet_cpp {
+// ----------------------------------------------------------------------
+// Encoding::PLAIN decoder implementation
+
template <int TYPE>
class PlainDecoder : public Decoder<TYPE> {
public:
@@ -60,7 +65,7 @@ inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
// Template specialization for BYTE_ARRAY
template <>
-inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
+inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
int max_values) {
max_values = std::min(max_values, num_values_);
for (int i = 0; i < max_values; ++i) {
@@ -76,7 +81,7 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
// Template specialization for FIXED_LEN_BYTE_ARRAY
template <>
-inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(
+inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode(
FixedLenByteArray* buffer, int max_values) {
max_values = std::min(max_values, num_values_);
int len = schema_->type_length;
@@ -91,10 +96,10 @@ inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(
}
template <>
-class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
+class PlainDecoder<Type::BOOLEAN> : public Decoder<Type::BOOLEAN> {
public:
explicit PlainDecoder(const parquet::SchemaElement* schema) :
- Decoder<parquet::Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {}
+ Decoder<Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
@@ -113,6 +118,49 @@ class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLE
RleDecoder decoder_;
};
+// ----------------------------------------------------------------------
+// Encoding::PLAIN encoder implementation
+
+template <int TYPE>
+class PlainEncoder : public Encoder<TYPE> {
+ public:
+ typedef typename type_traits<TYPE>::value_type T;
+
+ explicit PlainEncoder(const parquet::SchemaElement* schema) :
+ Encoder<TYPE>(schema, parquet::Encoding::PLAIN) {}
+
+ virtual size_t Encode(const T* src, int num_values, uint8_t* dst);
+};
+
+template <int TYPE>
+inline size_t PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
+ uint8_t* dst) {
+ size_t nbytes = num_values * sizeof(T);
+ memcpy(dst, buffer, nbytes);
+ return nbytes;
+}
+
+template <>
+inline size_t PlainEncoder<Type::BOOLEAN>::Encode(
+ const bool* src, int num_values, uint8_t* dst) {
+ ParquetException::NYI("bool encoding");
+ return 0;
+}
+
+template <>
+inline size_t PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
+ int num_values, uint8_t* dst) {
+ ParquetException::NYI("byte array encoding");
+ return 0;
+}
+
+template <>
+inline size_t PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
+ const FixedLenByteArray* src, int num_values, uint8_t* dst) {
+ ParquetException::NYI("FLBA encoding");
+ return 0;
+}
+
} // namespace parquet_cpp
#endif
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index a43a2a5..a4e767e 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -25,7 +25,9 @@
#include <vector>
#include "parquet/column/reader.h"
+#include "parquet/column/serialized-page.h"
#include "parquet/column/scanner.h"
+
#include "parquet/exception.h"
#include "parquet/thrift/util.h"
#include "parquet/util/input_stream.h"
@@ -115,8 +117,13 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
}
// TODO(wesm): This presumes a flat schema
- std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data,
- &this->parent_->metadata_.schema[i + 1], std::move(input));
+ const parquet::SchemaElement* schema = &parent_->metadata_.schema[i + 1];
+
+ std::unique_ptr<PageReader> pager(
+ new SerializedPageReader(std::move(input), col.meta_data.codec));
+
+ std::shared_ptr<ColumnReader> reader = ColumnReader::Make(schema,
+ std::move(pager));
column_readers_[i] = reader;
return reader;
@@ -269,7 +276,7 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
size_t nColumns = group_reader->num_columns();
for (int c = 0; c < group_reader->num_columns(); ++c) {
- const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata();
+ const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(c);
stream << "Column " << c
<< ": " << meta_data->num_values << " rows, "
<< meta_data->statistics.null_count << " null values, "
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
index 4c92119..16927a7 100644
--- a/src/parquet/reader.h
+++ b/src/parquet/reader.h
@@ -83,6 +83,10 @@ class RowGroupReader {
// column. Ownership is shared with the RowGroupReader.
std::shared_ptr<ColumnReader> Column(size_t i);
+ const parquet::ColumnMetaData* column_metadata(size_t i) const {
+ return &row_group_->columns[i].meta_data;
+ }
+
size_t num_columns() const {
return row_group_->columns.size();
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/util/bit-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h
index 4db585a..7a2e921 100644
--- a/src/parquet/util/bit-util.h
+++ b/src/parquet/util/bit-util.h
@@ -276,6 +276,14 @@ class BitUtil {
static T UnsetBit(T v, int bitpos) {
return v & ~(static_cast<T>(0x1) << bitpos);
}
+
+ // Returns the minimum number of bits needed to represent the value of 'x'
+ static inline int NumRequiredBits(uint64_t x) {
+ for (int i = 63; i >= 0; --i) {
+ if (x & 1L << i) return i + 1;
+ }
+ return 0;
+ }
};
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
new file mode 100644
index 0000000..38bc32c
--- /dev/null
+++ b/src/parquet/util/test-common.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_TEST_COMMON_H
+#define PARQUET_UTIL_TEST_COMMON_H
+
+#include <iostream>
+#include <vector>
+
+using std::vector;
+
+namespace parquet_cpp {
+
+namespace test {
+
+template <typename T>
+static inline bool vector_equal(const vector<T>& left, const vector<T>& right) {
+ if (left.size() != right.size()) {
+ return false;
+ }
+
+ for (size_t i = 0; i < left.size(); ++i) {
+ if (left[i] != right[i]) {
+ std::cerr << "index " << i
+ << " left was " << left[i]
+ << " right was " << right[i]
+ << std::endl;
+ return false;
+ }
+ }
+
+ return true;
+}
+
+} // namespace test
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_TEST_COMMON_H