You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/09/15 13:23:28 UTC

parquet-cpp git commit: PARQUET-718: Fix I/O of non-dictionary encoded pages

Repository: parquet-cpp
Updated Branches:
  refs/heads/master c6f5ebe52 -> 0bf72a96b


PARQUET-718: Fix I/O of non-dictionary encoded pages

We have set dictionary_page_offset if though we did not have a dictionary page present, this caused too much data to be loaded into the SerializedPageReader.

Author: Uwe L. Korn <uw...@xhochy.com>

Closes #159 from xhochy/PARQUET-718 and squashes the following commits:

b3a7205 [Uwe L. Korn] PARQUET-718: Fix I/O of non-dictionary encoded pages
1e07b9f [Uwe L. Korn] PARQUET-718: Reading boolean pages fails


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/0bf72a96
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/0bf72a96
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/0bf72a96

Branch: refs/heads/master
Commit: 0bf72a96b5b9c6643decd8bd1e45fd8dc9422e33
Parents: c6f5ebe
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Thu Sep 15 09:23:20 2016 -0400
Committer: Wes McKinney <we...@apache.org>
Committed: Thu Sep 15 09:23:20 2016 -0400

----------------------------------------------------------------------
 src/parquet/column/column-writer-test.cc | 96 +++-----------------------
 src/parquet/column/test-specialization.h | 97 +++++++++++++++++++++++++++
 src/parquet/file/file-serialize-test.cc  | 73 +++++++++-----------
 src/parquet/file/metadata.cc             |  2 +-
 4 files changed, 140 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index a87dc48..230a843 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -43,39 +43,17 @@ const int LARGE_SIZE = 10000;
 const int VERY_LARGE_SIZE = 400000;
 
 template <typename TestType>
-class TestPrimitiveWriter : public ::testing::Test {
+class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
  public:
   typedef typename TestType::c_type T;
 
-  void SetUpSchemaRequired() {
-    node_ = PrimitiveNode::Make("column", Repetition::REQUIRED, TestType::type_num,
-        LogicalType::NONE, FLBA_LENGTH);
-    schema_ = std::make_shared<ColumnDescriptor>(node_, 0, 0);
-  }
-
-  void SetUpSchemaOptional() {
-    node_ = PrimitiveNode::Make("column", Repetition::OPTIONAL, TestType::type_num,
-        LogicalType::NONE, FLBA_LENGTH);
-    schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 0);
-  }
-
-  void SetUpSchemaRepeated() {
-    node_ = PrimitiveNode::Make("column", Repetition::REPEATED, TestType::type_num,
-        LogicalType::NONE, FLBA_LENGTH);
-    schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 1);
-  }
-
-  void GenerateData(int64_t num_values);
-
-  void SetupValuesOut();
-
   void SetUp() {
-    SetupValuesOut();
+    this->SetupValuesOut(SMALL_SIZE);
     writer_properties_ = default_writer_properties();
     definition_levels_out_.resize(SMALL_SIZE);
     repetition_levels_out_.resize(SMALL_SIZE);
 
-    SetUpSchemaRequired();
+    this->SetUpSchemaRequired();
   }
 
   Type::type type_num() { return TestType::type_num; }
@@ -85,14 +63,15 @@ class TestPrimitiveWriter : public ::testing::Test {
     std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
     std::unique_ptr<SerializedPageReader> page_reader(
         new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED));
-    reader_.reset(new TypedColumnReader<TestType>(schema_.get(), std::move(page_reader)));
+    reader_.reset(
+        new TypedColumnReader<TestType>(this->descr_.get(), std::move(page_reader)));
   }
 
   std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
       int64_t output_size = SMALL_SIZE, Encoding::type encoding = Encoding::PLAIN) {
     sink_.reset(new InMemoryOutputStream());
-    metadata_ = ColumnChunkMetaDataBuilder::Make(
-        writer_properties_, schema_.get(), reinterpret_cast<uint8_t*>(&thrift_metadata_));
+    metadata_ = ColumnChunkMetaDataBuilder::Make(writer_properties_, this->descr_.get(),
+        reinterpret_cast<uint8_t*>(&thrift_metadata_));
     std::unique_ptr<SerializedPageWriter> pager(new SerializedPageWriter(
         sink_.get(), Compression::UNCOMPRESSED, metadata_.get()));
     WriterProperties::Builder wp_builder;
@@ -104,16 +83,15 @@ class TestPrimitiveWriter : public ::testing::Test {
     }
     writer_properties_ = wp_builder.build();
     std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
-        schema_.get(), std::move(pager), output_size, writer_properties_.get());
+        this->descr_.get(), std::move(pager), output_size, writer_properties_.get());
     return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
   }
 
-  void SyncValuesOut();
   void ReadColumn() {
     BuildReader();
-    reader_->ReadBatch(values_out_.size(), definition_levels_out_.data(),
-        repetition_levels_out_.data(), values_out_ptr_, &values_read_);
-    SyncValuesOut();
+    reader_->ReadBatch(this->values_out_.size(), definition_levels_out_.data(),
+        repetition_levels_out_.data(), this->values_out_ptr_, &values_read_);
+    this->SyncValuesOut();
   }
 
   void TestRequiredWithEncoding(Encoding::type encoding) {
@@ -156,22 +134,10 @@ class TestPrimitiveWriter : public ::testing::Test {
   // content is bound to the reader.
   std::unique_ptr<TypedColumnReader<TestType>> reader_;
 
-  // Input buffers
-  std::vector<T> values_;
-  std::vector<uint8_t> buffer_;
-  // Pointer to the values, needed as we cannot use vector<bool>::data()
-  T* values_ptr_;
-  std::vector<uint8_t> bool_buffer_;
-
-  // Output buffers
-  std::vector<T> values_out_;
-  std::vector<uint8_t> bool_buffer_out_;
-  T* values_out_ptr_;
   std::vector<int16_t> definition_levels_out_;
   std::vector<int16_t> repetition_levels_out_;
 
  private:
-  NodePtr node_;
   format::ColumnChunk thrift_metadata_;
   std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
   std::shared_ptr<ColumnDescriptor> schema_;
@@ -179,46 +145,6 @@ class TestPrimitiveWriter : public ::testing::Test {
   std::shared_ptr<WriterProperties> writer_properties_;
 };
 
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::SetupValuesOut() {
-  values_out_.resize(SMALL_SIZE);
-  values_out_ptr_ = values_out_.data();
-}
-
-template <>
-void TestPrimitiveWriter<BooleanType>::SetupValuesOut() {
-  values_out_.resize(SMALL_SIZE);
-  bool_buffer_out_.resize(SMALL_SIZE);
-  // Write once to all values so we can copy it without getting Valgrind errors
-  // about uninitialised values.
-  std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
-  values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
-}
-
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::SyncValuesOut() {}
-
-template <>
-void TestPrimitiveWriter<BooleanType>::SyncValuesOut() {
-  std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin());
-}
-
-template <typename TestType>
-void TestPrimitiveWriter<TestType>::GenerateData(int64_t num_values) {
-  values_.resize(num_values);
-  InitValues<T>(num_values, values_, buffer_);
-  values_ptr_ = values_.data();
-}
-
-template <>
-void TestPrimitiveWriter<BooleanType>::GenerateData(int64_t num_values) {
-  values_.resize(num_values);
-  InitValues<T>(num_values, values_, buffer_);
-  bool_buffer_.resize(num_values);
-  std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
-  values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
-}
-
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
     BooleanType, ByteArrayType, FLBAType> TestTypes;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/column/test-specialization.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-specialization.h b/src/parquet/column/test-specialization.h
index ab678b8..5803b65 100644
--- a/src/parquet/column/test-specialization.h
+++ b/src/parquet/column/test-specialization.h
@@ -22,6 +22,7 @@
 #ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H
 #define PARQUET_COLUMN_TEST_SPECIALIZATION_H
 
+#include <algorithm>
 #include <limits>
 #include <vector>
 
@@ -59,6 +60,102 @@ void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& b
       std::numeric_limits<int32_t>::max(), values.data());
 }
 
+// This class lives here because of its dependency on the InitValues specializations.
+template <typename TestType>
+class PrimitiveTypedTest : public ::testing::Test {
+ public:
+  typedef typename TestType::c_type T;
+
+  void SetUpSchemaRequired() {
+    primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::REQUIRED,
+        TestType::type_num, LogicalType::NONE, FLBA_LENGTH);
+    descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 0, 0);
+    node_ = schema::GroupNode::Make(
+        "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_}));
+    schema_.Init(node_);
+  }
+
+  void SetUpSchemaOptional() {
+    primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::OPTIONAL,
+        TestType::type_num, LogicalType::NONE, FLBA_LENGTH);
+    descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 1, 0);
+    node_ = schema::GroupNode::Make(
+        "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_}));
+    schema_.Init(node_);
+  }
+
+  void SetUpSchemaRepeated() {
+    primitive_node_ = schema::PrimitiveNode::Make("column", Repetition::REPEATED,
+        TestType::type_num, LogicalType::NONE, FLBA_LENGTH);
+    descr_ = std::make_shared<ColumnDescriptor>(primitive_node_, 1, 1);
+    node_ = schema::GroupNode::Make(
+        "schema", Repetition::REQUIRED, std::vector<schema::NodePtr>({primitive_node_}));
+    schema_.Init(node_);
+  }
+
+  void GenerateData(int64_t num_values);
+  void SetupValuesOut(int64_t num_values);
+  void SyncValuesOut();
+  void SetUp() { SetUpSchemaRequired(); }
+
+ protected:
+  schema::NodePtr primitive_node_;
+  schema::NodePtr node_;
+  SchemaDescriptor schema_;
+  std::shared_ptr<ColumnDescriptor> descr_;
+
+  // Input buffers
+  std::vector<T> values_;
+  std::vector<uint8_t> buffer_;
+  // Pointer to the values, needed as we cannot use vector<bool>::data()
+  T* values_ptr_;
+  std::vector<uint8_t> bool_buffer_;
+
+  // Output buffers
+  std::vector<T> values_out_;
+  std::vector<uint8_t> bool_buffer_out_;
+  T* values_out_ptr_;
+};
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SyncValuesOut() {}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SyncValuesOut() {
+  std::copy(bool_buffer_out_.begin(), bool_buffer_out_.end(), values_out_.begin());
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) {
+  values_out_.resize(num_values);
+  values_out_ptr_ = values_out_.data();
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) {
+  values_out_.resize(num_values);
+  bool_buffer_out_.resize(num_values);
+  // Write once to all values so we can copy it without getting Valgrind errors
+  // about uninitialised values.
+  std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
+  values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) {
+  values_.resize(num_values);
+  InitValues<T>(num_values, values_, buffer_);
+  values_ptr_ = values_.data();
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) {
+  values_.resize(num_values);
+  InitValues<T>(num_values, values_, buffer_);
+  bool_buffer_.resize(num_values);
+  std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
+  values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
+}
 }  // namespace test
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index bd41e1e..90ee7de 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -18,6 +18,8 @@
 #include <gtest/gtest.h>
 
 #include "parquet/column/reader.h"
+#include "parquet/column/test-util.h"
+#include "parquet/column/test-specialization.h"
 #include "parquet/column/writer.h"
 #include "parquet/file/reader.h"
 #include "parquet/file/writer.h"
@@ -33,45 +35,23 @@ using schema::PrimitiveNode;
 
 namespace test {
 
-class TestSerialize : public ::testing::Test {
+template <typename TestType>
+class TestSerialize : public PrimitiveTypedTest<TestType> {
  public:
-  void SetUpSchemaRequired() {
-    auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, Type::INT64);
-    node_ =
-        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-    schema_.Init(node_);
-  }
-
-  void SetUpSchemaOptional() {
-    auto pnode = PrimitiveNode::Make("int64", Repetition::OPTIONAL, Type::INT64);
-    node_ =
-        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-    schema_.Init(node_);
-  }
-
-  void SetUpSchemaRepeated() {
-    auto pnode = PrimitiveNode::Make("int64", Repetition::REPEATED, Type::INT64);
-    node_ =
-        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-    schema_.Init(node_);
-  }
-
-  void SetUp() { SetUpSchemaRequired(); }
+  typedef typename TestType::c_type T;
 
  protected:
-  NodePtr node_;
-  SchemaDescriptor schema_;
-
   void FileSerializeTest(Compression::type codec_type) {
     std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
-    auto gnode = std::static_pointer_cast<GroupNode>(node_);
+    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
     std::shared_ptr<WriterProperties> writer_properties =
-        WriterProperties::Builder().compression("schema.int64", codec_type)->build();
+        WriterProperties::Builder().compression("column", codec_type)->build();
     auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
     auto row_group_writer = file_writer->AppendRowGroup(100);
-    auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
-    std::vector<int64_t> values(100, 128);
-    column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+    auto column_writer =
+        static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+    this->GenerateData(100);
+    column_writer->WriteBatch(100, nullptr, nullptr, this->values_ptr_);
     column_writer->Close();
     row_group_writer->Close();
     file_writer->Close();
@@ -86,29 +66,38 @@ class TestSerialize : public ::testing::Test {
     auto rg_reader = file_reader->RowGroup(0);
     ASSERT_EQ(1, rg_reader->metadata()->num_columns());
     ASSERT_EQ(100, rg_reader->metadata()->num_rows());
+    // Check that the specified compression was actually used.
+    ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
 
-    auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
-    std::vector<int64_t> values_out(100);
+    auto col_reader =
+        std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(0));
     std::vector<int16_t> def_levels_out(100);
     std::vector<int16_t> rep_levels_out(100);
     int64_t values_read;
-    col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
-        values_out.data(), &values_read);
+    this->SetupValuesOut(100);
+    col_reader->ReadBatch(100, def_levels_out.data(), rep_levels_out.data(),
+        this->values_out_ptr_, &values_read);
+    this->SyncValuesOut();
     ASSERT_EQ(100, values_read);
-    ASSERT_EQ(values, values_out);
+    ASSERT_EQ(this->values_, this->values_out_);
   }
 };
 
-TEST_F(TestSerialize, SmallFileUncompressed) {
-  FileSerializeTest(Compression::UNCOMPRESSED);
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+    BooleanType, ByteArrayType, FLBAType> TestTypes;
+
+TYPED_TEST_CASE(TestSerialize, TestTypes);
+
+TYPED_TEST(TestSerialize, SmallFileUncompressed) {
+  this->FileSerializeTest(Compression::UNCOMPRESSED);
 }
 
-TEST_F(TestSerialize, SmallFileSnappy) {
-  FileSerializeTest(Compression::SNAPPY);
+TYPED_TEST(TestSerialize, SmallFileSnappy) {
+  this->FileSerializeTest(Compression::SNAPPY);
 }
 
-TEST_F(TestSerialize, SmallFileGzip) {
-  FileSerializeTest(Compression::GZIP);
+TYPED_TEST(TestSerialize, SmallFileGzip) {
+  this->FileSerializeTest(Compression::GZIP);
 }
 
 }  // namespace test

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0bf72a96/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 00ce990..9964882 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -355,13 +355,13 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
       int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
       int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback) {
     if (dictionary_page_offset > 0) {
+      column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
       column_chunk_->__set_file_offset(dictionary_page_offset + compressed_size);
     } else {
       column_chunk_->__set_file_offset(data_page_offset + compressed_size);
     }
     column_chunk_->__isset.meta_data = true;
     column_chunk_->meta_data.__set_num_values(num_values);
-    column_chunk_->meta_data.__set_dictionary_page_offset(dictionary_page_offset);
     column_chunk_->meta_data.__set_index_page_offset(index_page_offset);
     column_chunk_->meta_data.__set_data_page_offset(data_page_offset);
     column_chunk_->meta_data.__set_total_uncompressed_size(uncompressed_size);