You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/15 13:23:28 UTC
parquet-cpp git commit: PARQUET-718: Fix I/O of non-dictionary
encoded pages
Repository: parquet-cpp
Updated Branches:
refs/heads/master c6f5ebe52 -> 0bf72a96b
PARQUET-718: Fix I/O of non-dictionary encoded pages
We have set dictionary_page_offset if though we did not have a dictionary page present, this caused too much data to be loaded into the SerializedPageReader.
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #159 from xhochy/PARQUET-718 and squashes the following commits:
b3a7205 [Uwe L. Korn] PARQUET-718: Fix I/O of non-dictionary encoded pages
1e07b9f [Uwe L. Korn] PARQUET-718: Reading boolean pages fails
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/0bf72a96
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/0bf72a96
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/0bf72a96
Branch: refs/heads/master
Commit: 0bf72a96b5b9c6643decd8bd1e45fd8dc9422e33
Parents: c6f5ebe
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Thu Sep 15 09:23:20 2016 -0400
Committer: Wes McKinney <we...@apache.org>
Committed: Thu Sep 15 09:23:20 2016 -0400
----------------------------------------------------------------------
src/parquet/column/column-writer-test.cc | 96 +++-----------------------
src/parquet/column/test-specialization.h | 97 +++++++++++++++++++++++++++
src/parquet/file/file-serialize-test.cc | 73 +++++++++-----------
src/parquet/file/metadata.cc | 2 +-
4 files changed, 140 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index a87dc48..230a843 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -43,39 +43,17 @@ const int LARGE_SIZE = 10000;
const int VERY_LARGE_SIZE = 400000;
template <typename TestType>
-class TestPrimitiveWriter : public ::testing::Test {
+class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
public:
typedef typename TestType::c_type T;
- void SetUpSchemaRequired() {
- node_ = PrimitiveNode::Make("column", Repetition::REQUIRED, TestType::type_num,
- LogicalType::NONE, FLBA_LENGTH);
- schema_ = std::make_shared<ColumnDescriptor>(node_, 0, 0);
- }
-
- void SetUpSchemaOptional() {
- node_ = PrimitiveNode::Make("column", Repetition::OPTIONAL, TestType::type_num,
- LogicalType::NONE, FLBA_LENGTH);
- schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 0);
- }
-
- void SetUpSchemaRepeated() {
- node_ = PrimitiveNode::Make("column", Repetition::REPEATED, TestType::type_num,
- LogicalType::NONE, FLBA_LENGTH);
- schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 1);
- }
-
- void GenerateData(int64_t num_values);
-
- void SetupValuesOut();
-
void SetUp() {
- SetupValuesOut();
+ this->SetupValuesOut(SMALL_SIZE);
writer_properties_ = default_writer_properties();
definition_levels_out_.resize(SMALL_SIZE);
repetition_levels_out_.resize(SMALL_SIZE);
- SetUpSchemaRequired();
+ this->SetUpSchemaRequired();
}
Type::type type_num() { return TestType::type_num; }
@@ -85,14 +63,15 @@ class TestPrimitiveWriter : public ::testing::Test {
std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
std::unique_ptr<SerializedPageReader> page_reader(
new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED));
- reader_.reset(new TypedColumnReader<TestType>(schema_.get(), std::move(page_reader)));
+ reader_.reset(
+ new TypedColumnReader<TestType>(this->descr_.get(), std::move(page_reader)));
}
std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
sink_.reset(new InMemoryOutputStream());
- metadata_ = ColumnChunkMetaDataBuilder::Make(
- writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_));
+ metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_.get(),
+ reinterpret_cast<uint8_t*>(&thrift_metadata_));
std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter(
sink_.get(), Compression::UNCOMPRESSED, metadata_.get()));
WriterProperties::Builder wp_builder;
@@ -104,16 +83,15 @@ class TestPrimitiveWriter : public ::testing::Test {
}
writer_properties_ = wp_builder.build();
std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
- schema_.get(), std::move(pager), output_size, writer_properties_.get());
+ this->descr_.get(), std::move(pager), output_size, writer_properties_.get());
return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
}
- void SyncValuesOut();
void ReadColumn() {
BuildReader();
- reader_->ReadBatch(values_out_.size(), definition_levels_out_.data(),
- repetition_levels_out_.data(), values_out_ptr_, &values_read_);
- SyncValuesOut();
+ reader_->ReadBatch(this->values_out_.size(), definition_levels_out_.data(),
+ repetition_levels_out_.data(), this->values_out_ptr_, &values_read_);
+ this->SyncValuesOut();
}
void TestRequiredWithEncoding(Encoding::type encoding) {
@@ -156,22 +134,10 @@ class TestPrimitiveWriter : public ::testing::Test {
// content is bound to the reader.
std::unique_ptr<TypedColumnReader<TestType>> reader_;
- // Input buffers
- std::vector<T> values_;
- std::vector<uint8_t> buffer_;
- // Pointer to the values, needed as we cannot use vector<bool>::data()
- T* values_ptr_;
- std::vector<uint8_t> bool_buffer_;
-
- // Output buffers
- std::vector<T> values_out_;
- std::vector<uint8_t> bool_buffer_out_;
- T* values_out_ptr_;
std::vector<int16_t> definition_levels_out_;
std::vector<int16_t> repetition_levels_out_;
private:
- NodePtr node_;
format::ColumnChunk thrift_metadata_;
std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
std::shared_ptr<ColumnDescriptor> schema_;
@@ -179,46 +145,6 @@ class TestPrimitiveWriter : public ::testing::Test {
std::shared_ptr<WriterProperties> writer_properties_;
};
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::SetupValuesOut() {
- values_out_.resize(SMALL_SIZE);
- values_out_ptr_ = values_out_.data();
-}
-
-template <>
-void TestPrimitiveWriter<BooleanType>::SetupValuesOut() {
- values_out_.resize(SMALL_SIZE);
- bool_buffer_out_.resize(SMALL_SIZE);
- // Write once to all values so we can copy it without getting Valgrind errors
- // about uninitialised values.
- std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
- values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
-}
-
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::SyncValuesOut() {}
-
-template <>
-void TestPrimitiveWriter<BooleanType>::SyncValuesOut() {
- std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin());
-}
-
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::GenerateData(int64_t num_values) {
- values_.resize(num_values);
- InitValues<T>(num_values, values_, buffer_);
- values_ptr_ = values_.data();
-}
-
-template <>
-void TestPrimitiveWriter<BooleanType>::GenerateData(int64_t num_values) {
- values_.resize(num_values);
- InitValues<T>(num_values, values_, buffer_);
- bool_buffer_.resize(num_values);
- std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
- values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
-}
-
typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
BooleanType, ByteArrayType, FLBAType> TestTypes;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/column/test-specialization.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-specialization.h b/src/parquet/column/test-specialization.h
index ab678b8..5803b65 100644
--- a/src/parquet/column/test-specialization.h
+++ b/src/parquet/column/test-specialization.h
@@ -22,6 +22,7 @@
#ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H
#define PARQUET_COLUMN_TEST_SPECIALIZATION_H
+#include <algorithm>
#include <limits>
#include <vector>
@@ -59,6 +60,102 @@ void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& b
std::numeric_limits<int32_t>::max(), values.data());
}
+// This class lives here because of its dependency on the InitValues specializations.
+template <typename TestType>
+class PrimitiveTypedTest : public ::testing::Test {
+ public:
+ typedef typename TestType::c_type T;
+
+ void SetUpSchemaRequired() {
+ primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::REQUIRED,
+ TestType::type_num, LogicalType::NONE, FLBA_LENGTH);
+ descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 0, 0);
+ node_ = schema::GroupNode::Make(
+ "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_}));
+ schema_.Init(node_);
+ }
+
+ void SetUpSchemaOptional() {
+ primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::OPTIONAL,
+ TestType::type_num, LogicalType::NONE, FLBA_LENGTH);
+ descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 1, 0);
+ node_ = schema::GroupNode::Make(
+ "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_}));
+ schema_.Init(node_);
+ }
+
+ void SetUpSchemaRepeated() {
+ primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::REPEATED,
+ TestType::type_num, LogicalType::NONE, FLBA_LENGTH);
+ descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 1, 1);
+ node_ = schema::GroupNode::Make(
+ "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_}));
+ schema_.Init(node_);
+ }
+
+ void GenerateData(int64_t num_values);
+ void SetupValuesOut(int64_t num_values);
+ void SyncValuesOut();
+ void SetUp() { SetUpSchemaRequired(); }
+
+ protected:
+ schema::NodePtr primitive_node_;
+ schema::NodePtr node_;
+ SchemaDescriptor schema_;
+ std::shared_ptr<ColumnDescriptor> descr_;
+
+ // Input buffers
+ std::vector<T> values_;
+ std::vector<uint8_t> buffer_;
+ // Pointer to the values, needed as we cannot use vector<bool>::data()
+ T* values_ptr_;
+ std::vector<uint8_t> bool_buffer_;
+
+ // Output buffers
+ std::vector<T> values_out_;
+ std::vector<uint8_t> bool_buffer_out_;
+ T* values_out_ptr_;
+};
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SyncValuesOut() {}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SyncValuesOut() {
+ std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin());
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) {
+ values_out_.resize(num_values);
+ values_out_ptr_ = values_out_.data();
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) {
+ values_out_.resize(num_values);
+ bool_buffer_out_.resize(num_values);
+ // Write once to all values so we can copy it without getting Valgrind errors
+ // about uninitialised values.
+ std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
+ values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) {
+ values_.resize(num_values);
+ InitValues<T>(num_values, values_, buffer_);
+ values_ptr_ = values_.data();
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) {
+ values_.resize(num_values);
+ InitValues<T>(num_values, values_, buffer_);
+ bool_buffer_.resize(num_values);
+ std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
+ values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
+}
} // namespace test
} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index bd41e1e..90ee7de 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -18,6 +18,8 @@
#include <gtest/gtest.h>
#include "parquet/column/reader.h"
+#include "parquet/column/test-util.h"
+#include "parquet/column/test-specialization.h"
#include "parquet/column/writer.h"
#include "parquet/file/reader.h"
#include "parquet/file/writer.h"
@@ -33,45 +35,23 @@ using schema::PrimitiveNode;
namespace test {
-class TestSerialize : public ::testing::Test {
+template <typename TestType>
+class TestSerialize : public PrimitiveTypedTest<TestType> {
public:
- void SetUpSchemaRequired() {
- auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, Type::INT64);
- node_ =
- GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
- schema_.Init(node_);
- }
-
- void SetUpSchemaOptional() {
- auto pnode = PrimitiveNode::Make("int64", Repetition::OPTIONAL, Type::INT64);
- node_ =
- GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
- schema_.Init(node_);
- }
-
- void SetUpSchemaRepeated() {
- auto pnode = PrimitiveNode::Make("int64", Repetition::REPEATED, Type::INT64);
- node_ =
- GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
- schema_.Init(node_);
- }
-
- void SetUp() { SetUpSchemaRequired(); }
+ typedef typename TestType::c_type T;
protected:
- NodePtr node_;
- SchemaDescriptor schema_;
-
void FileSerializeTest(Compression::type codec_type) {
std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
- auto gnode = std::static_pointer_cast<GroupNode>(node_);
+ auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
std::shared_ptr<WriterProperties> writer_properties =
- WriterProperties::Builder().compression("schema.int64", codec_type)->build();
+ WriterProperties::Builder().compression("column", codec_type)->build();
auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
auto row_group_writer = file_writer->AppendRowGroup(100);
- auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
- std::vector<int64_t> values(100, 128);
- column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+ this->GenerateData(100);
+ column_writer->WriteBatch(100, nullptr, nullptr, this->values_ptr_);
column_writer->Close();
row_group_writer->Close();
file_writer->Close();
@@ -86,29 +66,38 @@ class TestSerialize : public ::testing::Test {
auto rg_reader = file_reader->RowGroup(0);
ASSERT_EQ(1, rg_reader->metadata()->num_columns());
ASSERT_EQ(100, rg_reader->metadata()->num_rows());
+ // Check that the specified compression was actually used.
+ ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
- auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
- std::vector<int64_t> values_out(100);
+ auto col_reader =
+ std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(0));
std::vector<int16_t> def_levels_out(100);
std::vector<int16_t> rep_levels_out(100);
int64_t values_read;
- col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
- values_out.data(), &values_read);
+ this->SetupValuesOut(100);
+ col_reader->ReadBatch(100, def_levels_out.data(), rep_levels_out.data(),
+ this->values_out_ptr_, &values_read);
+ this->SyncValuesOut();
ASSERT_EQ(100, values_read);
- ASSERT_EQ(values, values_out);
+ ASSERT_EQ(this->values_, this->values_out_);
}
};
-TEST_F(TestSerialize, SmallFileUncompressed) {
- FileSerializeTest(Compression::UNCOMPRESSED);
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+ BooleanType, ByteArrayType, FLBAType> TestTypes;
+
+TYPED_TEST_CASE(TestSerialize, TestTypes);
+
+TYPED_TEST(TestSerialize, SmallFileUncompressed) {
+ this->FileSerializeTest(Compression::UNCOMPRESSED);
}
-TEST_F(TestSerialize, SmallFileSnappy) {
- FileSerializeTest(Compression::SNAPPY);
+TYPED_TEST(TestSerialize, SmallFileSnappy) {
+ this->FileSerializeTest(Compression::SNAPPY);
}
-TEST_F(TestSerialize, SmallFileGzip) {
- FileSerializeTest(Compression::GZIP);
+TYPED_TEST(TestSerialize, SmallFileGzip) {
+ this->FileSerializeTest(Compression::GZIP);
}
} // namespace test
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 00ce990..9964882 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -355,13 +355,13 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) {
if (dictionary_page_offset > 0) {
+ column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
} else {
column_chunk_->__set_file_offset(data_page_offset + compressed_size);
}
column_chunk_->__isset.meta_data = true;
column_chunk_->meta_data.__set_num_values(num_values);
- column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
column_chunk_->meta_data.__set_index_page_offset(index_page_offset);
column_chunk_->meta_data.__set_data_page_offset(data_page_offset);
column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);