You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/16 00:55:24 UTC
[2/2] parquet-cpp git commit: PARQUET-446: Hide Thrift compiled
headers and Boost from public API, #include scrubbing
PARQUET-446: Hide Thrift compiled headers and Boost from public API, #include scrubbing
This is the completion of work I started in PARQUET-442. This also resolves PARQUET-277 as no boost headers are included in the public API anymore.
I've done some scrubbing of #includes using Google's Clang-based include-what-you-use tool. PARQUET-522 can also be resolved when this is merged.
Author: Wes McKinney <we...@cloudera.com>
Closes #49 from wesm/PARQUET-446 and squashes the following commits:
e805a0c [Wes McKinney] Use int64_t for scanner batch sizes
503b1c1 [Wes McKinney] Fix mixed-up include guard names
4c02d2b [Wes McKinney] Refactor monolithic encodings/encodings.h
9e28fc3 [Wes McKinney] Finished IWYU path. Some imported impala code left unchanged for now
6d4af8e [Wes McKinney] Some initial IWYU
5be40d6 [Wes McKinney] Remove outdated TODO
2e39062 [Wes McKinney] Remove any boost #include dependencies
07059ca [Wes McKinney] Remove serialized-page.* files, move serialized-page-test to parquet/file
9458b36 [Wes McKinney] Add more headers to parquet.h public API
b4b0412 [Wes McKinney] Remove Thrift compiled headers from public API and general use outside of deserialized-related internal headers and code paths. Add unit test to enforce this
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b71e826f
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b71e826f
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b71e826f
Branch: refs/heads/master
Commit: b71e826f0e3a392001124f5c730d2eaa29f5a44f
Parents: 05cd4ec
Author: Wes McKinney <we...@cloudera.com>
Authored: Mon Feb 15 15:55:18 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Mon Feb 15 15:55:18 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 1 -
example/decode_benchmark.cc | 6 +-
src/parquet/CMakeLists.txt | 1 +
src/parquet/column/CMakeLists.txt | 2 -
src/parquet/column/column-reader-test.cc | 10 +-
src/parquet/column/levels-test.cc | 124 ++++++++-------
src/parquet/column/levels.h | 29 ++--
src/parquet/column/page.h | 149 +++++++++++++------
src/parquet/column/reader.cc | 32 ++--
src/parquet/column/reader.h | 16 +-
src/parquet/column/scanner.cc | 3 +-
src/parquet/column/scanner.h | 18 ++-
src/parquet/column/serialized-page-test.cc | 109 --------------
src/parquet/column/serialized-page.cc | 122 ---------------
src/parquet/column/serialized-page.h | 71 ---------
src/parquet/column/test-util.h | 93 +++++++-----
src/parquet/compression/lz4-codec.cc | 5 +-
src/parquet/compression/snappy-codec.cc | 4 +
src/parquet/encodings/CMakeLists.txt | 3 +-
src/parquet/encodings/decoder.h | 70 +++++++++
src/parquet/encodings/delta-bit-pack-encoding.h | 8 +-
.../encodings/delta-byte-array-encoding.h | 8 +-
.../delta-length-byte-array-encoding.h | 9 +-
src/parquet/encodings/dictionary-encoding.h | 9 +-
src/parquet/encodings/encoder.h | 61 ++++++++
src/parquet/encodings/encodings.h | 111 --------------
src/parquet/encodings/plain-encoding-test.cc | 8 +-
src/parquet/encodings/plain-encoding.h | 16 +-
src/parquet/file/CMakeLists.txt | 2 +
src/parquet/file/file-deserialize-test.cc | 111 ++++++++++++++
src/parquet/file/reader-internal.cc | 127 +++++++++++++++-
src/parquet/file/reader-internal.h | 49 +++++-
src/parquet/file/reader.cc | 6 +-
src/parquet/file/reader.h | 8 +-
src/parquet/parquet.h | 10 ++
src/parquet/public-api-test.cc | 29 ++++
src/parquet/reader-test.cc | 3 +-
src/parquet/schema/CMakeLists.txt | 3 +-
src/parquet/schema/converter.cc | 28 +---
src/parquet/schema/converter.h | 22 +--
src/parquet/schema/descriptor.h | 2 +
src/parquet/schema/printer.cc | 14 +-
src/parquet/schema/printer.h | 6 +-
src/parquet/schema/schema-converter-test.cc | 10 +-
src/parquet/schema/schema-descriptor-test.cc | 7 +-
src/parquet/schema/schema-printer-test.cc | 10 +-
src/parquet/schema/schema-types-test.cc | 11 +-
src/parquet/schema/types.cc | 23 +--
src/parquet/schema/types.h | 5 +-
src/parquet/thrift/serializer-test.cc | 17 +--
src/parquet/thrift/util.h | 30 +++-
src/parquet/types.h | 1 -
src/parquet/util/bit-util.h | 31 +++-
src/parquet/util/input.h | 1 +
src/parquet/util/output-test.cc | 6 +-
src/parquet/util/output.cc | 2 -
src/parquet/util/output.h | 1 -
57 files changed, 916 insertions(+), 757 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ec7d66b..62182c4 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -296,7 +296,6 @@ endif()
# Library config
set(LIBPARQUET_SRCS
- src/parquet/column/serialized-page.cc
src/parquet/column/reader.cc
src/parquet/column/scanner.cc
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/example/decode_benchmark.cc
----------------------------------------------------------------------
diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc
index b9076bf..ce16588 100644
--- a/example/decode_benchmark.cc
+++ b/example/decode_benchmark.cc
@@ -20,7 +20,11 @@
#include <stdio.h>
#include "parquet/compression/codec.h"
-#include "parquet/encodings/encodings.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/encodings/delta-bit-pack-encoding.h"
+#include "parquet/encodings/delta-byte-array-encoding.h"
+#include "parquet/encodings/delta-length-byte-array-encoding.h"
#include "parquet/util/stopwatch.h"
using namespace parquet_cpp;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index 6a47917..97547ce 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -22,4 +22,5 @@ install(FILES
types.h
DESTINATION include/parquet)
+ADD_PARQUET_TEST(public-api-test)
ADD_PARQUET_TEST(reader-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index 32ec11c..99b4ed2 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -20,10 +20,8 @@ install(FILES
page.h
levels.h
reader.h
- serialized-page.h
scanner.h
DESTINATION include/parquet/column)
ADD_PARQUET_TEST(column-reader-test)
ADD_PARQUET_TEST(levels-test)
-ADD_PARQUET_TEST(serialized-page-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 84a36db..0abdf79 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
#include <cstdint>
#include <cstdlib>
-#include <iostream>
-#include <sstream>
+#include <memory>
#include <string>
#include <vector>
@@ -28,15 +28,13 @@
#include "parquet/column/page.h"
#include "parquet/column/reader.h"
#include "parquet/column/test-util.h"
-
-#include "parquet/util/output.h"
+#include "parquet/schema/descriptor.h"
+#include "parquet/schema/types.h"
#include "parquet/util/test-common.h"
using std::string;
using std::vector;
using std::shared_ptr;
-using parquet::FieldRepetitionType;
-using parquet::SchemaElement;
namespace parquet_cpp {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index 99cc21e..6061d23 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -15,98 +15,94 @@
// specific language governing permissions and limitations
// under the License.
-#include <cstdlib>
-#include <iostream>
-#include <sstream>
+#include <cstdint>
#include <string>
+#include <vector>
#include <gtest/gtest.h>
-#include "parquet/thrift/parquet_types.h"
#include "parquet/column/levels.h"
+#include "parquet/types.h"
using std::string;
namespace parquet_cpp {
-class TestLevels : public ::testing::Test {
- public:
- int GenerateLevels(int min_repeat_factor, int max_repeat_factor,
- int max_level, std::vector<int16_t>& input_levels) {
- int total_count = 0;
- // for each repetition count upto max_repeat_factor
- for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
- // repeat count increase by a factor of 2 for every iteration
- int repeat_count = (1 << repeat);
- // generate levels for repetition count upto the maximum level
- int value = 0;
- int bwidth = 0;
- while (value <= max_level) {
- for (int i = 0; i < repeat_count; i++) {
- input_levels[total_count++] = value;
- }
- value = (2 << bwidth) - 1;
- bwidth++;
+int GenerateLevels(int min_repeat_factor, int max_repeat_factor,
+ int max_level, std::vector<int16_t>& input_levels) {
+ int total_count = 0;
+ // for each repetition count upto max_repeat_factor
+ for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
+ // repeat count increase by a factor of 2 for every iteration
+ int repeat_count = (1 << repeat);
+ // generate levels for repetition count upto the maximum level
+ int value = 0;
+ int bwidth = 0;
+ while (value <= max_level) {
+ for (int i = 0; i < repeat_count; i++) {
+ input_levels[total_count++] = value;
}
+ value = (2 << bwidth) - 1;
+ bwidth++;
}
- return total_count;
}
+ return total_count;
+}
- void VerifyLevelsEncoding(parquet::Encoding::type encoding, int max_level,
- std::vector<int16_t>& input_levels) {
- LevelEncoder encoder;
- LevelDecoder decoder;
- int levels_count = 0;
- std::vector<int16_t> output_levels;
- std::vector<uint8_t> bytes;
- int num_levels = input_levels.size();
- output_levels.resize(num_levels);
- bytes.resize(2 * num_levels);
- ASSERT_EQ(num_levels, output_levels.size());
- ASSERT_EQ(2 * num_levels, bytes.size());
- // start encoding and decoding
- if (encoding == parquet::Encoding::RLE) {
- // leave space to write the rle length value
- encoder.Init(encoding, max_level, num_levels,
- bytes.data() + sizeof(uint32_t), bytes.size());
-
- levels_count = encoder.Encode(num_levels, input_levels.data());
- (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();
-
- } else {
- encoder.Init(encoding, max_level, num_levels,
- bytes.data(), bytes.size());
- levels_count = encoder.Encode(num_levels, input_levels.data());
- }
+void VerifyLevelsEncoding(Encoding::type encoding, int max_level,
+ std::vector<int16_t>& input_levels) {
+ LevelEncoder encoder;
+ LevelDecoder decoder;
+ int levels_count = 0;
+ std::vector<int16_t> output_levels;
+ std::vector<uint8_t> bytes;
+ int num_levels = input_levels.size();
+ output_levels.resize(num_levels);
+ bytes.resize(2 * num_levels);
+ ASSERT_EQ(num_levels, output_levels.size());
+ ASSERT_EQ(2 * num_levels, bytes.size());
+ // start encoding and decoding
+ if (encoding == Encoding::RLE) {
+ // leave space to write the rle length value
+ encoder.Init(encoding, max_level, num_levels,
+ bytes.data() + sizeof(uint32_t), bytes.size());
+
+ levels_count = encoder.Encode(num_levels, input_levels.data());
+ (reinterpret_cast<uint32_t*>(bytes.data()))[0] = encoder.len();
+
+ } else {
+ encoder.Init(encoding, max_level, num_levels,
+ bytes.data(), bytes.size());
+ levels_count = encoder.Encode(num_levels, input_levels.data());
+ }
- ASSERT_EQ(num_levels, levels_count);
+ ASSERT_EQ(num_levels, levels_count);
- decoder.Init(encoding, max_level, num_levels, bytes.data());
- levels_count = decoder.Decode(num_levels, output_levels.data());
+ decoder.Init(encoding, max_level, num_levels, bytes.data());
+ levels_count = decoder.Decode(num_levels, output_levels.data());
- ASSERT_EQ(num_levels, levels_count);
+ ASSERT_EQ(num_levels, levels_count);
- for (int i = 0; i < num_levels; i++) {
- EXPECT_EQ(input_levels[i], output_levels[i]);
- }
+ for (int i = 0; i < num_levels; i++) {
+ EXPECT_EQ(input_levels[i], output_levels[i]);
}
-};
+}
+
+TEST(TestLevels, TestEncodeDecodeLevels) {
+ // test levels with maximum bit-width from 1 to 8
+ // increase the repetition count for each iteration by a factor of 2
-// test levels with maximum bit-width from 1 to 8
-// increase the repetition count for each iteration by a factor of 2
-TEST_F(TestLevels, TestEncodeDecodeLevels) {
int min_repeat_factor = 0;
int max_repeat_factor = 7; // 128
int max_bit_width = 8;
std::vector<int16_t> input_levels;
- parquet::Encoding::type encodings[2] = {parquet::Encoding::RLE,
- parquet::Encoding::BIT_PACKED};
+ Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
// for each encoding
for (int encode = 0; encode < 2; encode++) {
- parquet::Encoding::type encoding = encodings[encode];
+ Encoding::type encoding = encodings[encode];
// BIT_PACKED requires a sequence of atleast 8
- if (encoding == parquet::Encoding::BIT_PACKED) min_repeat_factor = 3;
+ if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
// for each maximum bit-width
for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/levels.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels.h b/src/parquet/column/levels.h
index 4056223..18fd0bb 100644
--- a/src/parquet/column/levels.h
+++ b/src/parquet/column/levels.h
@@ -18,9 +18,10 @@
#ifndef PARQUET_COLUMN_LEVELS_H
#define PARQUET_COLUMN_LEVELS_H
+#include <memory>
+
#include "parquet/exception.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/encodings/encodings.h"
+#include "parquet/types.h"
#include "parquet/util/rle-encoding.h"
namespace parquet_cpp {
@@ -30,16 +31,16 @@ class LevelEncoder {
LevelEncoder() {}
// Initialize the LevelEncoder.
- void Init(parquet::Encoding::type encoding, int16_t max_level,
+ void Init(Encoding::type encoding, int16_t max_level,
int num_buffered_values, uint8_t* data, int data_size) {
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
- case parquet::Encoding::RLE: {
+ case Encoding::RLE: {
rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
break;
}
- case parquet::Encoding::BIT_PACKED: {
+ case Encoding::BIT_PACKED: {
int num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
break;
@@ -56,7 +57,7 @@ class LevelEncoder {
throw ParquetException("Level encoders are not initialized.");
}
- if (encoding_ == parquet::Encoding::RLE) {
+ if (encoding_ == Encoding::RLE) {
for (size_t i = 0; i < batch_size; ++i) {
if (!rle_encoder_->Put(*(levels + i))) {
break;
@@ -78,14 +79,16 @@ class LevelEncoder {
}
int32_t len() {
- assert(encoding_ == parquet::Encoding::RLE);
+ if (encoding_ != Encoding::RLE) {
+ throw ParquetException("Only implemented for RLE encoding");
+ }
return rle_length_;
}
private:
int bit_width_;
int rle_length_;
- parquet::Encoding::type encoding_;
+ Encoding::type encoding_;
std::unique_ptr<RleEncoder> rle_encoder_;
std::unique_ptr<BitWriter> bit_packed_encoder_;
};
@@ -96,20 +99,20 @@ class LevelDecoder {
LevelDecoder() {}
// Initialize the LevelDecoder and return the number of bytes consumed
- size_t Init(parquet::Encoding::type encoding, int16_t max_level,
+ size_t Init(Encoding::type encoding, int16_t max_level,
int num_buffered_values, const uint8_t* data) {
uint32_t num_bytes = 0;
uint32_t total_bytes = 0;
bit_width_ = BitUtil::Log2(max_level + 1);
encoding_ = encoding;
switch (encoding) {
- case parquet::Encoding::RLE: {
+ case Encoding::RLE: {
num_bytes = *reinterpret_cast<const uint32_t*>(data);
const uint8_t* decoder_data = data + sizeof(uint32_t);
rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
return sizeof(uint32_t) + num_bytes;
}
- case parquet::Encoding::BIT_PACKED: {
+ case Encoding::BIT_PACKED: {
num_bytes = BitUtil::Ceil(num_buffered_values * bit_width_, 8);
bit_packed_decoder_.reset(new BitReader(data, num_bytes));
return num_bytes;
@@ -127,7 +130,7 @@ class LevelDecoder {
throw ParquetException("Level decoders are not initialized.");
}
- if (encoding_ == parquet::Encoding::RLE) {
+ if (encoding_ == Encoding::RLE) {
for (size_t i = 0; i < batch_size; ++i) {
if (!rle_decoder_->Get(levels + i)) {
break;
@@ -147,7 +150,7 @@ class LevelDecoder {
private:
int bit_width_;
- parquet::Encoding::type encoding_;
+ Encoding::type encoding_;
std::unique_ptr<RleDecoder> rle_decoder_;
std::unique_ptr<BitReader> bit_packed_decoder_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index f2740b6..3308a1c 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -22,32 +22,28 @@
#ifndef PARQUET_COLUMN_PAGE_H
#define PARQUET_COLUMN_PAGE_H
-#include "parquet/thrift/parquet_types.h"
+#include <cstdint>
+#include <memory>
+
+#include "parquet/types.h"
namespace parquet_cpp {
-// Note: Copying the specific page header Thrift metadata to the Page object
-// (instead of using a pointer) presently so that data pages can be
-// decompressed and processed in parallel. We can turn the header members of
-// these classes into pointers at some point, but the downside is that
-// applications materializing multiple data pages at once will have to have a
-// data container that manages the lifetime of the deserialized
-// parquet::PageHeader structs.
-//
// TODO: Parallel processing is not yet safe because of memory-ownership
// semantics (the PageReader may or may not own the memory referenced by a
// page)
+//
+// TODO(wesm): In the future Parquet implementations may store the crc code
+// in parquet::PageHeader. parquet-mr currently does not, so we also skip it
+// here, both on the read and write path
class Page {
- // TODO(wesm): In the future Parquet implementations may store the crc code
- // in parquet::PageHeader. parquet-mr currently does not, so we also skip it
- // here, both on the read and write path
public:
- Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) :
+ Page(const uint8_t* buffer, int32_t buffer_size, PageType::type type) :
buffer_(buffer),
buffer_size_(buffer_size),
type_(type) {}
- parquet::PageType::type type() const {
+ PageType::type type() const {
return type_;
}
@@ -57,71 +53,138 @@ class Page {
}
// @returns: the total size in bytes of the page's data buffer
- size_t size() const {
+ int32_t size() const {
return buffer_size_;
}
private:
const uint8_t* buffer_;
- size_t buffer_size_;
+ int32_t buffer_size_;
- parquet::PageType::type type_;
+ PageType::type type_;
};
class DataPage : public Page {
public:
- DataPage(const uint8_t* buffer, size_t buffer_size,
- const parquet::DataPageHeader& header) :
- Page(buffer, buffer_size, parquet::PageType::DATA_PAGE),
- header_(header) {}
-
- size_t num_values() const {
- return header_.num_values;
+ DataPage(const uint8_t* buffer, int32_t buffer_size,
+ int32_t num_values, Encoding::type encoding,
+ Encoding::type definition_level_encoding,
+ Encoding::type repetition_level_encoding) :
+ Page(buffer, buffer_size, PageType::DATA_PAGE),
+ num_values_(num_values),
+ encoding_(encoding),
+ definition_level_encoding_(definition_level_encoding),
+ repetition_level_encoding_(repetition_level_encoding) {}
+
+ int32_t num_values() const {
+ return num_values_;
}
- parquet::Encoding::type encoding() const {
- return header_.encoding;
+ Encoding::type encoding() const {
+ return encoding_;
}
- parquet::Encoding::type repetition_level_encoding() const {
- return header_.repetition_level_encoding;
+ Encoding::type repetition_level_encoding() const {
+ return repetition_level_encoding_;
}
- parquet::Encoding::type definition_level_encoding() const {
- return header_.definition_level_encoding;
+ Encoding::type definition_level_encoding() const {
+ return definition_level_encoding_;
}
private:
- parquet::DataPageHeader header_;
+ int32_t num_values_;
+ Encoding::type encoding_;
+ Encoding::type definition_level_encoding_;
+ Encoding::type repetition_level_encoding_;
+
+ // TODO(wesm): parquet::DataPageHeader.statistics
};
class DataPageV2 : public Page {
public:
- DataPageV2(const uint8_t* buffer, size_t buffer_size,
- const parquet::DataPageHeaderV2& header) :
- Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2),
- header_(header) {}
+ DataPageV2(const uint8_t* buffer, int32_t buffer_size,
+ int32_t num_values, int32_t num_nulls, int32_t num_rows,
+ Encoding::type encoding,
+ int32_t definition_levels_byte_length,
+ int32_t repetition_levels_byte_length, bool is_compressed = false) :
+ Page(buffer, buffer_size, PageType::DATA_PAGE_V2),
+ num_values_(num_values),
+ num_nulls_(num_nulls),
+ num_rows_(num_rows),
+ encoding_(encoding),
+ definition_levels_byte_length_(definition_levels_byte_length),
+ repetition_levels_byte_length_(repetition_levels_byte_length),
+ is_compressed_(is_compressed) {}
+
+ int32_t num_values() const {
+ return num_values_;
+ }
+
+ int32_t num_nulls() const {
+ return num_nulls_;
+ }
+
+ int32_t num_rows() const {
+ return num_rows_;
+ }
+
+ Encoding::type encoding() const {
+ return encoding_;
+ }
+
+ int32_t definition_levels_byte_length() const {
+ return definition_levels_byte_length_;
+ }
+
+ int32_t repetition_levels_byte_length() const {
+ return repetition_levels_byte_length_;
+ }
+
+ bool is_compressed() const {
+ return is_compressed_;
+ }
private:
- parquet::DataPageHeaderV2 header_;
+ int32_t num_values_;
+ int32_t num_nulls_;
+ int32_t num_rows_;
+ Encoding::type encoding_;
+ int32_t definition_levels_byte_length_;
+ int32_t repetition_levels_byte_length_;
+ bool is_compressed_;
+
+ // TODO(wesm): parquet::DataPageHeaderV2.statistics
};
class DictionaryPage : public Page {
public:
- DictionaryPage(const uint8_t* buffer, size_t buffer_size,
- const parquet::DictionaryPageHeader& header) :
- Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE),
- header_(header) {}
+ DictionaryPage(const uint8_t* buffer, int32_t buffer_size,
+ int32_t num_values, Encoding::type encoding, bool is_sorted = false) :
+ Page(buffer, buffer_size, PageType::DICTIONARY_PAGE),
+ num_values_(num_values),
+ encoding_(encoding),
+ is_sorted_(is_sorted) {}
+
+ int32_t num_values() const {
+ return num_values_;
+ }
+
+ Encoding::type encoding() const {
+ return encoding_;
+ }
- size_t num_values() const {
- return header_.num_values;
+ bool is_sorted() const {
+ return is_sorted_;
}
private:
- parquet::DictionaryPageHeader header_;
+ int32_t num_values_;
+ Encoding::type encoding_;
+ bool is_sorted_;
};
// Abstract page iterator interface. This way, we can feed column pages to the
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index 878bd4f..4ba0616 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -18,13 +18,13 @@
#include "parquet/column/reader.h"
#include <algorithm>
+#include <cstdint>
#include <memory>
-#include <string>
-#include <string.h>
#include "parquet/column/page.h"
-#include "parquet/encodings/encodings.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/encodings/plain-encoding.h"
namespace parquet_cpp {
@@ -37,7 +37,7 @@ ColumnReader::ColumnReader(const ColumnDescriptor* descr,
template <int TYPE>
void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
- int encoding = static_cast<int>(parquet::Encoding::RLE_DICTIONARY);
+ int encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
auto it = decoders_.find(encoding);
if (it != decoders_.end()) {
@@ -61,9 +61,9 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
// encoding.
-static bool IsDictionaryIndexEncoding(const parquet::Encoding::type& e) {
- return e == parquet::Encoding::RLE_DICTIONARY ||
- e == parquet::Encoding::PLAIN_DICTIONARY;
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+ return e == Encoding::RLE_DICTIONARY ||
+ e == Encoding::PLAIN_DICTIONARY;
}
template <int TYPE>
@@ -78,10 +78,10 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
return false;
}
- if (current_page_->type() == parquet::PageType::DICTIONARY_PAGE) {
+ if (current_page_->type() == PageType::DICTIONARY_PAGE) {
ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
continue;
- } else if (current_page_->type() == parquet::PageType::DATA_PAGE) {
+ } else if (current_page_->type() == PageType::DATA_PAGE) {
const DataPage* page = static_cast<const DataPage*>(current_page_.get());
// Read a data page.
@@ -123,10 +123,10 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
// Get a decoder object for this page or create a new decoder if this is the
// first page with this encoding.
- parquet::Encoding::type encoding = page->encoding();
+ Encoding::type encoding = page->encoding();
if (IsDictionaryIndexEncoding(encoding)) {
- encoding = parquet::Encoding::RLE_DICTIONARY;
+ encoding = Encoding::RLE_DICTIONARY;
}
auto it = decoders_.find(static_cast<int>(encoding));
@@ -134,18 +134,18 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
current_decoder_ = it->second.get();
} else {
switch (encoding) {
- case parquet::Encoding::PLAIN: {
+ case Encoding::PLAIN: {
std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(descr_));
decoders_[static_cast<int>(encoding)] = decoder;
current_decoder_ = decoder.get();
break;
}
- case parquet::Encoding::RLE_DICTIONARY:
+ case Encoding::RLE_DICTIONARY:
throw ParquetException("Dictionary page must be before data page.");
- case parquet::Encoding::DELTA_BINARY_PACKED:
- case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
- case parquet::Encoding::DELTA_BYTE_ARRAY:
+ case Encoding::DELTA_BINARY_PACKED:
+ case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+ case Encoding::DELTA_BYTE_ARRAY:
ParquetException::NYI("Unsupported encoding");
default:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 4585de8..d11a13c 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -22,25 +22,17 @@
#include <cstdint>
#include <cstring>
#include <memory>
-#include <string>
#include <unordered_map>
-#include <vector>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
+#include "parquet/column/levels.h"
#include "parquet/column/page.h"
-#include "parquet/encodings/encodings.h"
+#include "parquet/encodings/decoder.h"
+#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
-#include "parquet/util/rle-encoding.h"
-#include "parquet/column/levels.h"
+#include "parquet/types.h"
namespace parquet_cpp {
-
-class Codec;
-class Scanner;
-
class ColumnReader {
public:
ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc
index 58f1460..4a0b32f 100644
--- a/src/parquet/column/scanner.cc
+++ b/src/parquet/column/scanner.cc
@@ -17,6 +17,7 @@
#include "parquet/column/scanner.h"
+#include <cstdint>
#include <memory>
#include "parquet/column/reader.h"
@@ -24,7 +25,7 @@
namespace parquet_cpp {
std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
- size_t batch_size) {
+ int64_t batch_size) {
switch (col_reader->type()) {
case Type::BOOLEAN:
return std::make_shared<BoolScanner>(col_reader, batch_size);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index 17fd5f6..512f540 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -18,24 +18,26 @@
#ifndef PARQUET_COLUMN_SCANNER_H
#define PARQUET_COLUMN_SCANNER_H
+#include <stdio.h>
+#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include "parquet/column/reader.h"
-
+#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
#include "parquet/types.h"
namespace parquet_cpp {
-static constexpr size_t DEFAULT_SCANNER_BATCH_SIZE = 128;
+static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
class Scanner {
public:
explicit Scanner(std::shared_ptr<ColumnReader> reader,
- size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
batch_size_(batch_size),
level_offset_(0),
levels_buffered_(0),
@@ -50,7 +52,7 @@ class Scanner {
virtual ~Scanner() {}
static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
- size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE);
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE);
virtual void PrintNext(std::ostream& out, int width) = 0;
@@ -62,14 +64,14 @@ class Scanner {
return reader_->descr();
}
- size_t batch_size() const { return batch_size_;}
+ int64_t batch_size() const { return batch_size_;}
- void SetBatchSize(size_t batch_size) {
+ void SetBatchSize(int64_t batch_size) {
batch_size_ = batch_size;
}
protected:
- size_t batch_size_;
+ int64_t batch_size_;
std::vector<int16_t> def_levels_;
std::vector<int16_t> rep_levels_;
@@ -91,7 +93,7 @@ class TypedScanner : public Scanner {
typedef typename type_traits<TYPE>::value_type T;
explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
- size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
+ int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
Scanner(reader, batch_size) {
typed_reader_ = static_cast<TypedColumnReader<TYPE>*>(reader.get());
size_t value_byte_size = type_traits<TYPE>::value_byte_size;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/serialized-page-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page-test.cc b/src/parquet/column/serialized-page-test.cc
deleted file mode 100644
index 5c49021..0000000
--- a/src/parquet/column/serialized-page-test.cc
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdlib>
-#include <iostream>
-#include <sstream>
-#include <string>
-
-#include <gtest/gtest.h>
-
-#include "parquet/types.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/thrift/util.h"
-#include "parquet/column/serialized-page.h"
-#include "parquet/column/page.h"
-#include "parquet/column/reader.h"
-#include "parquet/column/test-util.h"
-
-
-namespace parquet_cpp {
-
-class TestSerializedPage : public ::testing::Test {
- public:
- void InitSerializedPageReader(const uint8_t* buffer, size_t header_size,
- parquet::CompressionCodec::type codec) {
- std::unique_ptr<InputStream> stream;
- stream.reset(new InMemoryInputStream(buffer, header_size));
- page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
- }
-
- protected:
- std::unique_ptr<SerializedPageReader> page_reader_;
-};
-
-TEST_F(TestSerializedPage, TestLargePageHeaders) {
- parquet::PageHeader in_page_header;
- parquet::DataPageHeader data_page_header;
- parquet::PageHeader out_page_header;
- parquet::Statistics stats;
- int expected_header_size = 512 * 1024; //512 KB
- int stats_size = 256 * 1024; // 256 KB
- std::string serialized_buffer;
- int num_values = 4141;
-
- InitStats(stats_size, stats);
- InitDataPage(stats, data_page_header, num_values);
- InitPageHeader(data_page_header, in_page_header);
-
- // Serialize the Page header
- ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
- expected_header_size));
- // check header size is between 256 KB to 16 MB
- ASSERT_LE(stats_size, serialized_buffer.length());
- ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
-
- InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
- serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED);
-
- std::shared_ptr<Page> current_page = page_reader_->NextPage();
- ASSERT_EQ(parquet::PageType::DATA_PAGE, current_page->type());
- const DataPage* page = static_cast<const DataPage*>(current_page.get());
- ASSERT_EQ(num_values, page->num_values());
-}
-
-TEST_F(TestSerializedPage, TestFailLargePageHeaders) {
- parquet::PageHeader in_page_header;
- parquet::DataPageHeader data_page_header;
- parquet::PageHeader out_page_header;
- parquet::Statistics stats;
- int expected_header_size = 512 * 1024; // 512 KB
- int stats_size = 256 * 1024; // 256 KB
- int max_header_size = 128 * 1024; // 128 KB
- int num_values = 4141;
- std::string serialized_buffer;
-
- InitStats(stats_size, stats);
- InitDataPage(stats, data_page_header, num_values);
- InitPageHeader(data_page_header, in_page_header);
-
- // Serialize the Page header
- ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
- expected_header_size));
- // check header size is between 256 KB to 16 MB
- ASSERT_LE(stats_size, serialized_buffer.length());
- ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
-
- InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
- serialized_buffer.length(), parquet::CompressionCodec::UNCOMPRESSED);
-
- // Set the max page header size to 128 KB, which is less than the current header size
- page_reader_->set_max_page_header_size(max_header_size);
-
- ASSERT_THROW(page_reader_->NextPage(), ParquetException);
-}
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/serialized-page.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc
deleted file mode 100644
index 56b73a7..0000000
--- a/src/parquet/column/serialized-page.cc
+++ /dev/null
@@ -1,122 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/serialized-page.h"
-
-#include <memory>
-
-#include "parquet/exception.h"
-#include "parquet/thrift/util.h"
-
-using parquet::PageType;
-
-namespace parquet_cpp {
-
-// ----------------------------------------------------------------------
-// SerializedPageReader deserializes Thrift metadata and pages that have been
-// assembled in a serialized stream for storing in a Parquet files
-
-SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
- parquet::CompressionCodec::type codec) :
- stream_(std::move(stream)) {
- max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
- switch (codec) {
- case parquet::CompressionCodec::UNCOMPRESSED:
- break;
- case parquet::CompressionCodec::SNAPPY:
- decompressor_.reset(new SnappyCodec());
- break;
- default:
- ParquetException::NYI("Reading compressed data");
- }
-}
-
-
-std::shared_ptr<Page> SerializedPageReader::NextPage() {
- // Loop here because there may be unhandled page types that we skip until
- // finding a page that we do know what to do with
- while (true) {
- int64_t bytes_read = 0;
- int64_t bytes_available = 0;
- uint32_t header_size = 0;
- const uint8_t* buffer;
- uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE;
- std::stringstream ss;
-
- // Page headers can be very large because of page statistics
- // We try to deserialize a larger buffer progressively
- // until a maximum allowed header limit
- while (true) {
- buffer = stream_->Peek(allowed_page_size, &bytes_available);
- if (bytes_available == 0) {
- return std::shared_ptr<Page>(nullptr);
- }
-
- // This gets used, then set by DeserializeThriftMsg
- header_size = bytes_available;
- try {
- DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
- break;
- } catch (std::exception& e) {
- // Failed to deserialize. Double the allowed page header size and try again
- ss << e.what();
- allowed_page_size *= 2;
- if (allowed_page_size > max_page_header_size_) {
- ss << "Deserializing page header failed.\n";
- throw ParquetException(ss.str());
- }
- }
- }
- // Advance the stream offset
- stream_->Read(header_size, &bytes_read);
-
- int compressed_len = current_page_header_.compressed_page_size;
- int uncompressed_len = current_page_header_.uncompressed_page_size;
-
- // Read the compressed data page.
- buffer = stream_->Read(compressed_len, &bytes_read);
- if (bytes_read != compressed_len) ParquetException::EofException();
-
- // Uncompress it if we need to
- if (decompressor_ != NULL) {
- // Grow the uncompressed buffer if we need to.
- if (uncompressed_len > decompression_buffer_.size()) {
- decompression_buffer_.resize(uncompressed_len);
- }
- decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
- &decompression_buffer_[0]);
- buffer = &decompression_buffer_[0];
- }
-
- if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
- return std::make_shared<DictionaryPage>(buffer, uncompressed_len,
- current_page_header_.dictionary_page_header);
- } else if (current_page_header_.type == PageType::DATA_PAGE) {
- return std::make_shared<DataPage>(buffer, uncompressed_len,
- current_page_header_.data_page_header);
- } else if (current_page_header_.type == PageType::DATA_PAGE_V2) {
- ParquetException::NYI("data page v2");
- } else {
- // We don't know what this page type is. We're allowed to skip non-data
- // pages.
- continue;
- }
- }
- return std::shared_ptr<Page>(nullptr);
-}
-
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/serialized-page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h
deleted file mode 100644
index 62bf66d..0000000
--- a/src/parquet/column/serialized-page.h
+++ /dev/null
@@ -1,71 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This module defines an abstract interface for iterating through pages in a
-// Parquet column chunk within a row group. It could be extended in the future
-// to iterate through all data pages in all chunks in a file.
-
-#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H
-#define PARQUET_COLUMN_SERIALIZED_PAGE_H
-
-#include <memory>
-#include <vector>
-
-#include "parquet/column/page.h"
-#include "parquet/compression/codec.h"
-#include "parquet/util/input.h"
-#include "parquet/thrift/parquet_types.h"
-
-namespace parquet_cpp {
-
-// 16 MB is the default maximum page header size
-static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
-// 16 KB is the default expected page header size
-static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift parquet::PageHeader indicating the type of each page
-// and the page metadata.
-class SerializedPageReader : public PageReader {
- public:
- SerializedPageReader(std::unique_ptr<InputStream> stream,
- parquet::CompressionCodec::type codec);
-
- virtual ~SerializedPageReader() {}
-
- // Implement the PageReader interface
- virtual std::shared_ptr<Page> NextPage();
-
- void set_max_page_header_size(uint32_t size) {
- max_page_header_size_ = size;
- }
-
- private:
- std::unique_ptr<InputStream> stream_;
-
- parquet::PageHeader current_page_header_;
- std::shared_ptr<Page> current_page_;
-
- // Compression codec to use.
- std::unique_ptr<Codec> decompressor_;
- std::vector<uint8_t> decompression_buffer_;
- // Maximum allowed page size
- uint32_t max_page_header_size_;
-};
-
-} // namespace parquet_cpp
-
-#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 90dde3b..b346fc2 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -27,7 +27,14 @@
#include <vector>
#include <string>
+#include "parquet/column/levels.h"
#include "parquet/column/page.h"
+
+// Depended on by SerializedPageReader test utilities for now
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input.h"
+
namespace parquet_cpp {
namespace test {
@@ -61,36 +68,38 @@ class DataPageBuilder {
typedef typename type_traits<TYPE>::value_type T;
// This class writes data and metadata to the passed inputs
- explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) :
+ explicit DataPageBuilder(InMemoryOutputStream* sink) :
sink_(sink),
- header_(header),
num_values_(0),
+ encoding_(Encoding::PLAIN),
+ definition_level_encoding_(Encoding::RLE),
+ repetition_level_encoding_(Encoding::RLE),
have_def_levels_(false),
have_rep_levels_(false),
have_values_(false) {
}
- void AppendDefLevels(const std::vector<int16_t>& levels,
- int16_t max_level, parquet::Encoding::type encoding) {
+ void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level,
+ Encoding::type encoding = Encoding::RLE) {
AppendLevels(levels, max_level, encoding);
- num_values_ = std::max(levels.size(), num_values_);
- header_->__set_definition_level_encoding(encoding);
+ num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
+ definition_level_encoding_ = encoding;
have_def_levels_ = true;
}
- void AppendRepLevels(const std::vector<int16_t>& levels,
- int16_t max_level, parquet::Encoding::type encoding) {
+ void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level,
+ Encoding::type encoding = Encoding::RLE) {
AppendLevels(levels, max_level, encoding);
- num_values_ = std::max(levels.size(), num_values_);
- header_->__set_repetition_level_encoding(encoding);
+ num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
+ repetition_level_encoding_ = encoding;
have_rep_levels_ = true;
}
void AppendValues(const std::vector<T>& values,
- parquet::Encoding::type encoding) {
- if (encoding != parquet::Encoding::PLAIN) {
+ Encoding::type encoding = Encoding::PLAIN) {
+ if (encoding != Encoding::PLAIN) {
ParquetException::NYI("only plain encoding currently implemented");
}
size_t bytes_to_encode = values.size() * sizeof(T);
@@ -98,31 +107,43 @@ class DataPageBuilder {
PlainEncoder<TYPE> encoder(nullptr);
encoder.Encode(&values[0], values.size(), sink_);
- num_values_ = std::max(values.size(), num_values_);
- header_->__set_encoding(encoding);
+ num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
+ encoding_ = encoding;
have_values_ = true;
}
- void Finish() {
- if (!have_values_) {
- throw ParquetException("A data page must at least contain values");
- }
- header_->__set_num_values(num_values_);
+ int32_t num_values() const {
+ return num_values_;
+ }
+
+ Encoding::type encoding() const {
+ return encoding_;
+ }
+
+ Encoding::type rep_level_encoding() const {
+ return repetition_level_encoding_;
+ }
+
+ Encoding::type def_level_encoding() const {
+ return definition_level_encoding_;
}
private:
InMemoryOutputStream* sink_;
- parquet::DataPageHeader* header_;
- size_t num_values_;
+ int32_t num_values_;
+ Encoding::type encoding_;
+ Encoding::type definition_level_encoding_;
+ Encoding::type repetition_level_encoding_;
+
bool have_def_levels_;
bool have_rep_levels_;
bool have_values_;
// Used internally for both repetition and definition levels
void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
- parquet::Encoding::type encoding) {
- if (encoding != parquet::Encoding::RLE) {
+ Encoding::type encoding) {
+ if (encoding != Encoding::RLE) {
ParquetException::NYI("only rle encoding currently implemented");
}
@@ -152,32 +173,32 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
size_t num_values = values.size();
InMemoryOutputStream page_stream;
- parquet::DataPageHeader page_header;
-
- test::DataPageBuilder<TYPE> page_builder(&page_stream, &page_header);
+ test::DataPageBuilder<TYPE> page_builder(&page_stream);
if (!rep_levels.empty()) {
- page_builder.AppendRepLevels(rep_levels, max_rep_level,
- parquet::Encoding::RLE);
+ page_builder.AppendRepLevels(rep_levels, max_rep_level);
}
if (!def_levels.empty()) {
- page_builder.AppendDefLevels(def_levels, max_def_level,
- parquet::Encoding::RLE);
+ page_builder.AppendDefLevels(def_levels, max_def_level);
}
- page_builder.AppendValues(values, parquet::Encoding::PLAIN);
- page_builder.Finish();
-
- // Hand off the data stream to the passed std::vector
+ page_builder.AppendValues(values);
page_stream.Transfer(out_buffer);
- return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header);
+ return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(),
+ page_builder.num_values(),
+ page_builder.encoding(),
+ page_builder.def_level_encoding(),
+ page_builder.rep_level_encoding());
}
+
} // namespace test
+// Utilities for testing the SerializedPageReader internally
+
static inline void InitDataPage(const parquet::Statistics& stat,
- parquet::DataPageHeader& data_page, int nvalues) {
+ parquet::DataPageHeader& data_page, int32_t nvalues) {
data_page.encoding = parquet::Encoding::PLAIN;
data_page.definition_level_encoding = parquet::Encoding::RLE;
data_page.repetition_level_encoding = parquet::Encoding::RLE;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/compression/lz4-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc
index dfd50f6..a131031 100644
--- a/src/parquet/compression/lz4-codec.cc
+++ b/src/parquet/compression/lz4-codec.cc
@@ -18,6 +18,9 @@
#include "parquet/compression/codec.h"
#include <lz4.h>
+#include <cstdint>
+
+#include "parquet/exception.h"
namespace parquet_cpp {
@@ -26,7 +29,7 @@ void Lz4Codec::Decompress(int64_t input_len, const uint8_t* input,
int64_t n = LZ4_decompress_fast(reinterpret_cast<const char*>(input),
reinterpret_cast<char*>(output_buffer), output_len);
if (n != input_len) {
- throw parquet_cpp::ParquetException("Corrupt lz4 compressed data.");
+ throw ParquetException("Corrupt lz4 compressed data.");
}
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
index 4135a15..91590db 100644
--- a/src/parquet/compression/snappy-codec.cc
+++ b/src/parquet/compression/snappy-codec.cc
@@ -18,6 +18,10 @@
#include "parquet/compression/codec.h"
#include <snappy.h>
+#include <cstdint>
+#include <cstdlib>
+
+#include "parquet/exception.h"
namespace parquet_cpp {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt
index 638fba0..c9349af 100644
--- a/src/parquet/encodings/CMakeLists.txt
+++ b/src/parquet/encodings/CMakeLists.txt
@@ -17,7 +17,8 @@
# Headers: encodings
install(FILES
- encodings.h
+ decoder.h
+ encoder.h
delta-bit-pack-encoding.h
delta-byte-array-encoding.h
delta-length-byte-array-encoding.h
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/decoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h
new file mode 100644
index 0000000..55b29e8
--- /dev/null
+++ b/src/parquet/encodings/decoder.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ENCODINGS_DECODER_H
+#define PARQUET_ENCODINGS_DECODER_H
+
+#include <cstdint>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet_cpp {
+
+class ColumnDescriptor;
+
+// The Decoder template is parameterized on parquet_cpp::Type::type
+template <int TYPE>
+class Decoder {
+ public:
+ typedef typename type_traits<TYPE>::value_type T;
+
+ virtual ~Decoder() {}
+
+ // Sets the data for a new page. This will be called multiple times on the same
+ // decoder and should reset all internal state.
+ virtual void SetData(int num_values, const uint8_t* data, int len) = 0;
+
+ // Subclasses should override the ones they support. In each of these functions,
+ // the decoder would decode put to 'max_values', storing the result in 'buffer'.
+ // The function returns the number of values decoded, which should be max_values
+ // except for end of the current data page.
+ virtual int Decode(T* buffer, int max_values) {
+ throw ParquetException("Decoder does not implement this type.");
+ }
+
+ // Returns the number of values left (for the last call to SetData()). This is
+ // the number of values left in this page.
+ int values_left() const { return num_values_; }
+
+ const Encoding::type encoding() const { return encoding_; }
+
+ protected:
+ explicit Decoder(const ColumnDescriptor* descr,
+ const Encoding::type& encoding)
+ : descr_(descr), encoding_(encoding), num_values_(0) {}
+
+ // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+ const ColumnDescriptor* descr_;
+
+ const Encoding::type encoding_;
+ int num_values_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_ENCODINGS_DECODER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 4eb762b..d512db9 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -18,11 +18,13 @@
#ifndef PARQUET_DELTA_BIT_PACK_ENCODING_H
#define PARQUET_DELTA_BIT_PACK_ENCODING_H
-#include "parquet/encodings/encodings.h"
-
#include <algorithm>
+#include <cstdint>
#include <vector>
+#include "parquet/encodings/decoder.h"
+#include "parquet/util/bit-stream-utils.inline.h"
+
namespace parquet_cpp {
template <int TYPE>
@@ -31,7 +33,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
typedef typename type_traits<TYPE>::value_type T;
explicit DeltaBitPackDecoder(const ColumnDescriptor* descr)
- : Decoder<TYPE>(descr, parquet::Encoding::DELTA_BINARY_PACKED) {
+ : Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED) {
if (TYPE != Type::INT32 && TYPE != Type::INT64) {
throw ParquetException("Delta bit pack encoding should only be for integer data.");
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/delta-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h
index 2763f16..01dceea 100644
--- a/src/parquet/encodings/delta-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-byte-array-encoding.h
@@ -18,16 +18,18 @@
#ifndef PARQUET_DELTA_BYTE_ARRAY_ENCODING_H
#define PARQUET_DELTA_BYTE_ARRAY_ENCODING_H
-#include "parquet/encodings/encodings.h"
-
#include <algorithm>
+#include "parquet/encodings/decoder.h"
+#include "parquet/encodings/delta-length-byte-array-encoding.h"
+#include "parquet/encodings/delta-bit-pack-encoding.h"
+
namespace parquet_cpp {
class DeltaByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
public:
explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr)
- : Decoder<Type::BYTE_ARRAY>(descr, parquet::Encoding::DELTA_BYTE_ARRAY),
+ : Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_BYTE_ARRAY),
prefix_len_decoder_(nullptr),
suffix_decoder_(nullptr) {
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/delta-length-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h
index 0868924..a1b4fd3 100644
--- a/src/parquet/encodings/delta-length-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-length-byte-array-encoding.h
@@ -18,9 +18,12 @@
#ifndef PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H
#define PARQUET_DELTA_LENGTH_BYTE_ARRAY_ENCODING_H
-#include "parquet/encodings/encodings.h"
-
#include <algorithm>
+#include <cstdint>
+#include <vector>
+
+#include "parquet/encodings/decoder.h"
+#include "parquet/encodings/delta-bit-pack-encoding.h"
namespace parquet_cpp {
@@ -28,7 +31,7 @@ class DeltaLengthByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
public:
explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr)
: Decoder<Type::BYTE_ARRAY>(descr,
- parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY),
+ Encoding::DELTA_LENGTH_BYTE_ARRAY),
len_decoder_(nullptr) {
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 0547eb3..b52aefb 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -18,11 +18,14 @@
#ifndef PARQUET_DICTIONARY_ENCODING_H
#define PARQUET_DICTIONARY_ENCODING_H
-#include "parquet/encodings/encodings.h"
-
#include <algorithm>
+#include <cstdint>
#include <vector>
+#include "parquet/encodings/decoder.h"
+#include "parquet/encodings/encoder.h"
+#include "parquet/util/rle-encoding.h"
+
namespace parquet_cpp {
template <int TYPE>
@@ -35,7 +38,7 @@ class DictionaryDecoder : public Decoder<TYPE> {
// dictionary decoder needs to copy the data out if necessary.
DictionaryDecoder(const ColumnDescriptor* descr,
Decoder<TYPE>* dictionary)
- : Decoder<TYPE>(descr, parquet::Encoding::RLE_DICTIONARY) {
+ : Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) {
Init(dictionary);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
new file mode 100644
index 0000000..50ba48f
--- /dev/null
+++ b/src/parquet/encodings/encoder.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_ENCODINGS_ENCODER_H
+#define PARQUET_ENCODINGS_ENCODER_H
+
+#include <cstdint>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet_cpp {
+
+class ColumnDescriptor;
+class OutputStream;
+
+// Base class for value encoders. Since encoders may or not have state (e.g.,
+// dictionary encoding) we use a class instance to maintain any state.
+//
+// TODO(wesm): Encode interface API is temporary
+template <int TYPE>
+class Encoder {
+ public:
+ typedef typename type_traits<TYPE>::value_type T;
+
+ virtual ~Encoder() {}
+
+ // Subclasses should override the ones they support
+ virtual void Encode(const T* src, int num_values, OutputStream* dst) {
+ throw ParquetException("Encoder does not implement this type.");
+ }
+
+ const Encoding::type encoding() const { return encoding_; }
+
+ protected:
+ explicit Encoder(const ColumnDescriptor* descr,
+ const Encoding::type& encoding)
+ : descr_(descr), encoding_(encoding) {}
+
+ // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+ const ColumnDescriptor* descr_;
+ const Encoding::type encoding_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_ENCODINGS_ENCODER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
deleted file mode 100644
index 46c61b6..0000000
--- a/src/parquet/encodings/encodings.h
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_ENCODINGS_ENCODINGS_H
-#define PARQUET_ENCODINGS_ENCODINGS_H
-
-#include <cstdint>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
-
-#include "parquet/util/output.h"
-#include "parquet/util/rle-encoding.h"
-#include "parquet/util/bit-stream-utils.inline.h"
-
-#include "parquet/schema/descriptor.h"
-
-#include "parquet/thrift/parquet_types.h"
-
-namespace parquet_cpp {
-
-// The Decoder template is parameterized on parquet_cpp::Type::type
-template <int TYPE>
-class Decoder {
- public:
- typedef typename type_traits<TYPE>::value_type T;
-
- virtual ~Decoder() {}
-
- // Sets the data for a new page. This will be called multiple times on the same
- // decoder and should reset all internal state.
- virtual void SetData(int num_values, const uint8_t* data, int len) = 0;
-
- // Subclasses should override the ones they support. In each of these functions,
- // the decoder would decode put to 'max_values', storing the result in 'buffer'.
- // The function returns the number of values decoded, which should be max_values
- // except for end of the current data page.
- virtual int Decode(T* buffer, int max_values) {
- throw ParquetException("Decoder does not implement this type.");
- }
-
- // Returns the number of values left (for the last call to SetData()). This is
- // the number of values left in this page.
- int values_left() const { return num_values_; }
-
- const parquet::Encoding::type encoding() const { return encoding_; }
-
- protected:
- explicit Decoder(const ColumnDescriptor* descr,
- const parquet::Encoding::type& encoding)
- : descr_(descr), encoding_(encoding), num_values_(0) {}
-
- // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
- const ColumnDescriptor* descr_;
-
- const parquet::Encoding::type encoding_;
- int num_values_;
-};
-
-
-// Base class for value encoders. Since encoders may or not have state (e.g.,
-// dictionary encoding) we use a class instance to maintain any state.
-//
-// TODO(wesm): Encode interface API is temporary
-template <int TYPE>
-class Encoder {
- public:
- typedef typename type_traits<TYPE>::value_type T;
-
- virtual ~Encoder() {}
-
- // Subclasses should override the ones they support
- virtual void Encode(const T* src, int num_values, OutputStream* dst) {
- throw ParquetException("Encoder does not implement this type.");
- }
-
- const parquet::Encoding::type encoding() const { return encoding_; }
-
- protected:
- explicit Encoder(const ColumnDescriptor* descr,
- const parquet::Encoding::type& encoding)
- : descr_(descr), encoding_(encoding) {}
-
- // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
- const ColumnDescriptor* descr_;
- const parquet::Encoding::type encoding_;
-};
-
-} // namespace parquet_cpp
-
-#include "parquet/encodings/plain-encoding.h"
-#include "parquet/encodings/dictionary-encoding.h"
-#include "parquet/encodings/delta-bit-pack-encoding.h"
-#include "parquet/encodings/delta-length-byte-array-encoding.h"
-#include "parquet/encodings/delta-byte-array-encoding.h"
-
-#endif // PARQUET_ENCODINGS_ENCODINGS_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index 16862b8..955a415 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -16,13 +16,17 @@
// under the License.
#include <cstdint>
+#include <cstdlib>
#include <string>
#include <vector>
#include <gtest/gtest.h>
-#include "parquet/util/test-common.h"
-#include "parquet/encodings/encodings.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/types.h"
+#include "parquet/util/bit-util.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index a450eb4..78560fd 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -18,11 +18,15 @@
#ifndef PARQUET_PLAIN_ENCODING_H
#define PARQUET_PLAIN_ENCODING_H
-#include "parquet/encodings/encodings.h"
-
#include <algorithm>
#include <vector>
+#include "parquet/encodings/decoder.h"
+#include "parquet/encodings/encoder.h"
+#include "parquet/schema/descriptor.h"
+#include "parquet/util/bit-stream-utils.inline.h"
+#include "parquet/util/output.h"
+
namespace parquet_cpp {
// ----------------------------------------------------------------------
@@ -35,7 +39,7 @@ class PlainDecoder : public Decoder<TYPE> {
using Decoder<TYPE>::num_values_;
explicit PlainDecoder(const ColumnDescriptor* descr) :
- Decoder<TYPE>(descr, parquet::Encoding::PLAIN),
+ Decoder<TYPE>(descr, Encoding::PLAIN),
data_(NULL), len_(0) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
@@ -98,7 +102,7 @@ template <>
class PlainDecoder<Type::BOOLEAN> : public Decoder<Type::BOOLEAN> {
public:
explicit PlainDecoder(const ColumnDescriptor* descr) :
- Decoder<Type::BOOLEAN>(descr, parquet::Encoding::PLAIN) {}
+ Decoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {}
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
@@ -145,7 +149,7 @@ class PlainEncoder : public Encoder<TYPE> {
typedef typename type_traits<TYPE>::value_type T;
explicit PlainEncoder(const ColumnDescriptor* descr) :
- Encoder<TYPE>(descr, parquet::Encoding::PLAIN) {}
+ Encoder<TYPE>(descr, Encoding::PLAIN) {}
virtual void Encode(const T* src, int num_values, OutputStream* dst);
};
@@ -154,7 +158,7 @@ template <>
class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
public:
explicit PlainEncoder(const ColumnDescriptor* descr) :
- Encoder<Type::BOOLEAN>(descr, parquet::Encoding::PLAIN) {}
+ Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {}
virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
throw ParquetException("this API for encoding bools not implemented");
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
index ef6ac01..b7a65f1 100644
--- a/src/parquet/file/CMakeLists.txt
+++ b/src/parquet/file/CMakeLists.txt
@@ -18,3 +18,5 @@
install(FILES
reader.h
DESTINATION include/parquet/file)
+
+ADD_PARQUET_TEST(file-deserialize-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
new file mode 100644
index 0000000..e90889d
--- /dev/null
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdlib>
+#include <cstdint>
+#include <exception>
+#include <memory>
+#include <string>
+
+#include "parquet/column/page.h"
+#include "parquet/column/test-util.h"
+
+#include "parquet/file/reader-internal.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/thrift/util.h"
+#include "parquet/types.h"
+#include "parquet/util/input.h"
+
+namespace parquet_cpp {
+
+class TestSerializedPage : public ::testing::Test {
+ public:
+ void InitSerializedPageReader(const uint8_t* buffer, size_t header_size,
+ Compression::type codec) {
+ std::unique_ptr<InputStream> stream;
+ stream.reset(new InMemoryInputStream(buffer, header_size));
+ page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
+ }
+
+ protected:
+ std::unique_ptr<SerializedPageReader> page_reader_;
+};
+
+TEST_F(TestSerializedPage, TestLargePageHeaders) {
+ parquet::PageHeader in_page_header;
+ parquet::DataPageHeader data_page_header;
+ parquet::PageHeader out_page_header;
+ parquet::Statistics stats;
+ int expected_header_size = 512 * 1024; //512 KB
+ int stats_size = 256 * 1024; // 256 KB
+ std::string serialized_buffer;
+ int num_values = 4141;
+
+ InitStats(stats_size, stats);
+ InitDataPage(stats, data_page_header, num_values);
+ InitPageHeader(data_page_header, in_page_header);
+
+ // Serialize the Page header
+ ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
+ expected_header_size));
+ // check header size is between 256 KB to 16 MB
+ ASSERT_LE(stats_size, serialized_buffer.length());
+ ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
+
+ InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
+ serialized_buffer.length(), Compression::UNCOMPRESSED);
+
+ std::shared_ptr<Page> current_page = page_reader_->NextPage();
+ ASSERT_EQ(PageType::DATA_PAGE, current_page->type());
+ const DataPage* page = static_cast<const DataPage*>(current_page.get());
+ ASSERT_EQ(num_values, page->num_values());
+}
+
+TEST_F(TestSerializedPage, TestFailLargePageHeaders) {
+ parquet::PageHeader in_page_header;
+ parquet::DataPageHeader data_page_header;
+ parquet::PageHeader out_page_header;
+ parquet::Statistics stats;
+ int expected_header_size = 512 * 1024; // 512 KB
+ int stats_size = 256 * 1024; // 256 KB
+ int max_header_size = 128 * 1024; // 128 KB
+ int num_values = 4141;
+ std::string serialized_buffer;
+
+ InitStats(stats_size, stats);
+ InitDataPage(stats, data_page_header, num_values);
+ InitPageHeader(data_page_header, in_page_header);
+
+ // Serialize the Page header
+ ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
+ expected_header_size));
+ // check header size is between 256 KB to 16 MB
+ ASSERT_LE(stats_size, serialized_buffer.length());
+ ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
+
+ InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
+ serialized_buffer.length(), Compression::UNCOMPRESSED);
+
+ // Set the max page header size to 128 KB, which is less than the current header size
+ page_reader_->set_max_page_header_size(max_header_size);
+
+ ASSERT_THROW(page_reader_->NextPage(), ParquetException);
+}
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 7b0a719..47092a5 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -17,17 +17,138 @@
#include "parquet/file/reader-internal.h"
-#include <memory>
+#include <string.h>
+#include <algorithm>
+#include <exception>
+#include <ostream>
#include <vector>
-#include "parquet/column/serialized-page.h"
+#include "parquet/column/page.h"
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
#include "parquet/schema/converter.h"
+#include "parquet/schema/descriptor.h"
+#include "parquet/schema/types.h"
#include "parquet/thrift/util.h"
+#include "parquet/types.h"
#include "parquet/util/input.h"
namespace parquet_cpp {
// ----------------------------------------------------------------------
+// SerializedPageReader deserializes Thrift metadata and pages that have been
+// assembled in a serialized stream for storing in a Parquet files
+
+SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
+ Compression::type codec) :
+ stream_(std::move(stream)) {
+ max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
+ // TODO(wesm): add GZIP after PARQUET-456
+ switch (codec) {
+ case Compression::UNCOMPRESSED:
+ break;
+ case Compression::SNAPPY:
+ decompressor_.reset(new SnappyCodec());
+ break;
+ case Compression::LZO:
+ decompressor_.reset(new Lz4Codec());
+ break;
+ default:
+ ParquetException::NYI("Reading compressed data");
+ }
+}
+
+std::shared_ptr<Page> SerializedPageReader::NextPage() {
+ // Loop here because there may be unhandled page types that we skip until
+ // finding a page that we do know what to do with
+ while (true) {
+ int64_t bytes_read = 0;
+ int64_t bytes_available = 0;
+ uint32_t header_size = 0;
+ const uint8_t* buffer;
+ uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE;
+ std::stringstream ss;
+
+ // Page headers can be very large because of page statistics
+ // We try to deserialize a larger buffer progressively
+ // until a maximum allowed header limit
+ while (true) {
+ buffer = stream_->Peek(allowed_page_size, &bytes_available);
+ if (bytes_available == 0) {
+ return std::shared_ptr<Page>(nullptr);
+ }
+
+ // This gets used, then set by DeserializeThriftMsg
+ header_size = bytes_available;
+ try {
+ DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_);
+ break;
+ } catch (std::exception& e) {
+ // Failed to deserialize. Double the allowed page header size and try again
+ ss << e.what();
+ allowed_page_size *= 2;
+ if (allowed_page_size > max_page_header_size_) {
+ ss << "Deserializing page header failed.\n";
+ throw ParquetException(ss.str());
+ }
+ }
+ }
+ // Advance the stream offset
+ stream_->Read(header_size, &bytes_read);
+
+ int compressed_len = current_page_header_.compressed_page_size;
+ int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+ // Read the compressed data page.
+ buffer = stream_->Read(compressed_len, &bytes_read);
+ if (bytes_read != compressed_len) ParquetException::EofException();
+
+ // Uncompress it if we need to
+ if (decompressor_ != NULL) {
+ // Grow the uncompressed buffer if we need to.
+ if (uncompressed_len > decompression_buffer_.size()) {
+ decompression_buffer_.resize(uncompressed_len);
+ }
+ decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+ &decompression_buffer_[0]);
+ buffer = &decompression_buffer_[0];
+ }
+
+ if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
+ const parquet::DictionaryPageHeader& dict_header =
+ current_page_header_.dictionary_page_header;
+
+ bool is_sorted = dict_header.__isset.is_sorted? dict_header.is_sorted : false;
+
+ return std::make_shared<DictionaryPage>(buffer, uncompressed_len,
+ dict_header.num_values, FromThrift(dict_header.encoding),
+ is_sorted);
+ } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) {
+ const parquet::DataPageHeader& header = current_page_header_.data_page_header;
+
+ return std::make_shared<DataPage>(buffer, uncompressed_len,
+ header.num_values,
+ FromThrift(header.encoding),
+ FromThrift(header.definition_level_encoding),
+ FromThrift(header.repetition_level_encoding));
+ } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) {
+ const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+ bool is_compressed = header.__isset.is_compressed? header.is_compressed : false;
+ return std::make_shared<DataPageV2>(buffer, uncompressed_len,
+ header.num_values, header.num_nulls, header.num_rows,
+ FromThrift(header.encoding),
+ header.definition_levels_byte_length,
+ header.repetition_levels_byte_length, is_compressed);
+ } else {
+ // We don't know what this page type is. We're allowed to skip non-data
+ // pages.
+ continue;
+ }
+ }
+ return std::shared_ptr<Page>(nullptr);
+}
+
+// ----------------------------------------------------------------------
// SerializedRowGroup
int SerializedRowGroup::num_columns() const {
@@ -62,7 +183,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
const ColumnDescriptor* descr = schema_->Column(i);
return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(input),
- col.meta_data.codec));
+ FromThrift(col.meta_data.codec)));
}
RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 8ba105e..b7e9154 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -18,16 +18,57 @@
#ifndef PARQUET_FILE_READER_INTERNAL_H
#define PARQUET_FILE_READER_INTERNAL_H
-#include "parquet/file/reader.h"
-
+#include <cstdint>
#include <memory>
+#include <vector>
-#include "parquet/schema/descriptor.h"
-#include "parquet/util/input.h"
+#include "parquet/column/page.h"
+#include "parquet/compression/codec.h"
+#include "parquet/file/reader.h"
#include "parquet/thrift/parquet_types.h"
+#include "parquet/types.h"
+#include "parquet/util/input.h"
namespace parquet_cpp {
+class SchemaDescriptor;
+
+// 16 MB is the default maximum page header size
+static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
+
+// 16 KB is the default expected page header size
+static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift parquet::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageReader : public PageReader {
+ public:
+ SerializedPageReader(std::unique_ptr<InputStream> stream,
+ Compression::type codec);
+
+ virtual ~SerializedPageReader() {}
+
+ // Implement the PageReader interface
+ virtual std::shared_ptr<Page> NextPage();
+
+ void set_max_page_header_size(uint32_t size) {
+ max_page_header_size_ = size;
+ }
+
+ private:
+ std::unique_ptr<InputStream> stream_;
+
+ parquet::PageHeader current_page_header_;
+ std::shared_ptr<Page> current_page_;
+
+ // Compression codec to use.
+ std::unique_ptr<Codec> decompressor_;
+ std::vector<uint8_t> decompression_buffer_;
+ // Maximum allowed page size
+ uint32_t max_page_header_size_;
+};
+
// RowGroupReader::Contents implementation for the Parquet file specification
class SerializedRowGroup : public RowGroupReader::Contents {
public:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 6ef59ed..9da0f09 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -18,17 +18,19 @@
#include "parquet/file/reader.h"
#include <cstdio>
-#include <cstring>
#include <memory>
#include <sstream>
#include <string>
+#include <utility>
#include <vector>
+#include "parquet/column/page.h"
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
-
#include "parquet/exception.h"
#include "parquet/file/reader-internal.h"
+#include "parquet/util/input.h"
+#include "parquet/types.h"
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 3ff8697..32ae429 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -19,21 +19,17 @@
#define PARQUET_FILE_READER_H
#include <cstdint>
+#include <iosfwd>
#include <memory>
#include <string>
-#include <stdio.h>
#include <unordered_map>
-#include "parquet/types.h"
-#include "parquet/schema/descriptor.h"
-
-// TODO(wesm): Still depends on Thrift
#include "parquet/column/page.h"
+#include "parquet/schema/descriptor.h"
namespace parquet_cpp {
class ColumnReader;
-class ParquetFileReader;
struct RowGroupStatistics {
int64_t num_values;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index b8624ae..ea8ab5e 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -27,9 +27,19 @@
#include <vector>
#include "parquet/exception.h"
+
+// Column reader API
#include "parquet/column/reader.h"
+
+// File API
#include "parquet/file/reader.h"
+// Schemas
+#include "parquet/schema/descriptor.h"
+#include "parquet/schema/printer.h"
+#include "parquet/schema/types.h"
+
+// IO
#include "parquet/util/input.h"
#include "parquet/util/output.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/public-api-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/public-api-test.cc b/src/parquet/public-api-test.cc
new file mode 100644
index 0000000..4103714
--- /dev/null
+++ b/src/parquet/public-api-test.cc
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include "parquet/parquet.h"
+
+namespace parquet_cpp {
+
+TEST(TestPublicAPI, DoesNotIncludeThrift) {
+#ifdef _THRIFT_THRIFT_H_
+ FAIL() << "Thrift headers should not be in the public API";
+#endif
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b71e826f/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 8599e7e..97a5f79 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -16,9 +16,9 @@
// under the License.
#include <cstdlib>
+#include <cstdint>
#include <iostream>
#include <memory>
-#include <sstream>
#include <string>
#include <gtest/gtest.h>
@@ -26,7 +26,6 @@
#include "parquet/file/reader.h"
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
-#include "parquet/util/input.h"
using std::string;