You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/21 01:11:59 UTC
parquet-cpp git commit: PARQUET-457: Verify page deserialization for
GZIP and SNAPPY codecs, related refactoring
Repository: parquet-cpp
Updated Branches:
refs/heads/master 9cab887f2 -> 891985439
PARQUET-457: Verify page deserialization for GZIP and SNAPPY codecs, related refactoring
This also restores passing on user's `CMAKE_CXX_FLAGS`, which had unfortunately led some compiler warnings to creep into our build.
Author: Wes McKinney <we...@apache.org>
Closes #58 from wesm/PARQUET-457 and squashes the following commits:
4bf12ed [Wes McKinney] * SerializeThriftMsg now writes into an OutputStream. * Refactor page serialization in advance of compression tests * Test compression roundtrip on random bytes for snappy and gzip * Trying LZO compression results in ParquetException * Don't lose user's CMAKE_CXX_FLAGS * Remove Travis CI directory caching for now * Fix gzip memory leak if you do not call inflateEnd, deflateEnd
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/89198543
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/89198543
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/89198543
Branch: refs/heads/master
Commit: 89198543987ea8830501f35ba11581bf3a1b5a03
Parents: 9cab887
Author: Wes McKinney <we...@apache.org>
Authored: Sat Feb 20 16:11:41 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Sat Feb 20 16:11:41 2016 -0800
----------------------------------------------------------------------
.travis.yml | 3 -
CMakeLists.txt | 6 +-
src/parquet/column/levels-test.cc | 1 +
src/parquet/column/page.h | 16 +-
src/parquet/column/test-util.h | 29 ---
src/parquet/compression/CMakeLists.txt | 1 +
src/parquet/compression/codec.cc | 47 +++++
src/parquet/compression/codec.h | 8 +
src/parquet/compression/gzip-codec.cc | 31 ++-
src/parquet/encodings/plain-encoding-test.cc | 8 +-
src/parquet/file/file-deserialize-test.cc | 232 +++++++++++++++++-----
src/parquet/file/reader-internal.cc | 30 +--
src/parquet/schema/schema-descriptor-test.cc | 1 +
src/parquet/thrift/CMakeLists.txt | 2 -
src/parquet/thrift/serializer-test.cc | 75 -------
src/parquet/thrift/util.h | 11 +-
src/parquet/util/macros.h | 5 +
src/parquet/util/output.h | 4 +
18 files changed, 320 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index f93f232..24d2a20 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,9 +19,6 @@ addons:
- bison #needed for thrift cpp compilation
- flex #needed for thrift cpp compilation
- pkg-config #needed for thrift cpp compilation
-cache:
- directories:
- - $TRAVIS_BUILD_DIR/parquet-build
matrix:
include:
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c853993..218e74a 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -231,11 +231,11 @@ set(CXX_FLAGS_RELEASE "-O3 -g")
string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE)
if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG")
- set(CMAKE_CXX_FLAGS ${CXX_FLAGS_DEBUG})
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_DEBUG}")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "FASTDEBUG")
- set(CMAKE_CXX_FLAGS ${CXX_FLAGS_FASTDEBUG})
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_FASTDEBUG}")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
- set(CMAKE_CXX_FLAGS ${CXX_FLAGS_RELEASE})
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CXX_FLAGS_RELEASE}")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif ()
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index 62188db..0e3c20f 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -16,6 +16,7 @@
// under the License.
#include <cstdint>
+#include <memory>
#include <vector>
#include <string>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 3308a1c..916fd12 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -24,6 +24,7 @@
#include <cstdint>
#include <memory>
+#include <string>
#include "parquet/types.h"
@@ -93,13 +94,26 @@ class DataPage : public Page {
return definition_level_encoding_;
}
+ // DataPageHeader::statistics::max field, if it was set
+ const uint8_t* max() const {
+ return reinterpret_cast<const uint8_t*>(max_.c_str());
+ }
+
+ // DataPageHeader::statistics::min field, if it was set
+ const uint8_t* min() const {
+ return reinterpret_cast<const uint8_t*>(min_.c_str());
+ }
+
private:
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;
- // TODO(wesm): parquet::DataPageHeader.statistics
+ // So max/min can be populated privately
+ friend class SerializedPageReader;
+ std::string max_;
+ std::string min_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index b346fc2..b12f340 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -32,7 +32,6 @@
// Depended on by SerializedPageReader test utilities for now
#include "parquet/encodings/plain-encoding.h"
-#include "parquet/thrift/util.h"
#include "parquet/util/input.h"
namespace parquet_cpp {
@@ -195,34 +194,6 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
} // namespace test
-// Utilities for testing the SerializedPageReader internally
-
-static inline void InitDataPage(const parquet::Statistics& stat,
- parquet::DataPageHeader& data_page, int32_t nvalues) {
- data_page.encoding = parquet::Encoding::PLAIN;
- data_page.definition_level_encoding = parquet::Encoding::RLE;
- data_page.repetition_level_encoding = parquet::Encoding::RLE;
- data_page.num_values = nvalues;
- data_page.__set_statistics(stat);
-}
-
-static inline void InitStats(size_t stat_size, parquet::Statistics& stat) {
- std::vector<char> stat_buffer;
- stat_buffer.resize(stat_size);
- for (int i = 0; i < stat_size; i++) {
- (reinterpret_cast<uint8_t*>(stat_buffer.data()))[i] = i % 255;
- }
- stat.__set_max(std::string(stat_buffer.data(), stat_size));
-}
-
-static inline void InitPageHeader(const parquet::DataPageHeader &data_page,
- parquet::PageHeader& page_header) {
- page_header.__set_data_page_header(data_page);
- page_header.uncompressed_page_size = 0;
- page_header.compressed_page_size = 0;
- page_header.type = parquet::PageType::DATA_PAGE;
-}
-
} // namespace parquet_cpp
#endif // PARQUET_COLUMN_TEST_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/compression/CMakeLists.txt b/src/parquet/compression/CMakeLists.txt
index 2c0b67c..f0ee110 100644
--- a/src/parquet/compression/CMakeLists.txt
+++ b/src/parquet/compression/CMakeLists.txt
@@ -16,6 +16,7 @@
# under the License.
add_library(parquet_compression STATIC
+ codec.cc
lz4-codec.cc
snappy-codec.cc
gzip-codec.cc
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.cc b/src/parquet/compression/codec.cc
new file mode 100644
index 0000000..60d308e
--- /dev/null
+++ b/src/parquet/compression/codec.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
+#include "parquet/types.h"
+
+namespace parquet_cpp {
+
+std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) {
+ std::unique_ptr<Codec> result;
+ switch (codec_type) {
+ case Compression::UNCOMPRESSED:
+ break;
+ case Compression::SNAPPY:
+ result.reset(new SnappyCodec());
+ break;
+ case Compression::GZIP:
+ result.reset(new GZipCodec());
+ break;
+ case Compression::LZO:
+ ParquetException::NYI("LZO codec not implemented");
+ break;
+ default:
+ ParquetException::NYI("Unrecognized codec");
+ break;
+ }
+ return result;
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index 8fc4ada..bc73f02 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -19,16 +19,21 @@
#define PARQUET_COMPRESSION_CODEC_H
#include <cstdint>
+#include <memory>
#include <zlib.h>
#include "parquet/exception.h"
+#include "parquet/types.h"
namespace parquet_cpp {
class Codec {
public:
virtual ~Codec() {}
+
+ static std::unique_ptr<Codec> Create(Compression::type codec);
+
virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer) = 0;
@@ -80,6 +85,7 @@ class GZipCodec : public Codec {
};
explicit GZipCodec(Format format = GZIP);
+ virtual ~GZipCodec();
virtual void Decompress(int64_t input_len, const uint8_t* input,
int64_t output_len, uint8_t* output_buffer);
@@ -109,6 +115,8 @@ class GZipCodec : public Codec {
// perform the refactoring then
void InitCompressor();
void InitDecompressor();
+ void EndCompressor();
+ void EndDecompressor();
bool compressor_initialized_;
bool decompressor_initialized_;
};
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/compression/gzip-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/gzip-codec.cc b/src/parquet/compression/gzip-codec.cc
index 6ec2726..f48fdad 100644
--- a/src/parquet/compression/gzip-codec.cc
+++ b/src/parquet/compression/gzip-codec.cc
@@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-#include "parquet/compression/codec.h"
-
#include <cstring>
#include <sstream>
+#include <string>
+
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
namespace parquet_cpp {
@@ -40,7 +42,13 @@ GZipCodec::GZipCodec(Format format) :
decompressor_initialized_(false) {
}
+GZipCodec::~GZipCodec() {
+ EndCompressor();
+ EndDecompressor();
+}
+
void GZipCodec::InitCompressor() {
+ EndDecompressor();
memset(&stream_, 0, sizeof(stream_));
int ret;
@@ -58,12 +66,18 @@ void GZipCodec::InitCompressor() {
}
compressor_initialized_ = true;
- decompressor_initialized_ = false;
+}
+
+void GZipCodec::EndCompressor() {
+ if (compressor_initialized_) {
+ (void)deflateEnd(&stream_);
+ }
+ compressor_initialized_ = false;
}
void GZipCodec::InitDecompressor() {
+ EndCompressor();
memset(&stream_, 0, sizeof(stream_));
-
int ret;
// Initialize to run either deflate or zlib/gzip format
@@ -71,11 +85,16 @@ void GZipCodec::InitDecompressor() {
if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) {
throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg));
}
-
- compressor_initialized_ = false;
decompressor_initialized_ = true;
}
+void GZipCodec::EndDecompressor() {
+ if (decompressor_initialized_) {
+ (void)inflateEnd(&stream_);
+ }
+ decompressor_initialized_ = false;
+}
+
void GZipCodec::Decompress(int64_t input_length, const uint8_t* input,
int64_t output_length, uint8_t* output) {
if (!decompressor_initialized_) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index b8ef13b..5091dc8 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -17,11 +17,13 @@
#include <cstdint>
#include <cstdlib>
+#include <cstring>
#include <string>
#include <vector>
#include <gtest/gtest.h>
+#include "parquet/schema/descriptor.h"
#include "parquet/encodings/plain-encoding.h"
#include "parquet/types.h"
#include "parquet/schema/types.h"
@@ -80,7 +82,7 @@ class EncodeDecode{
void generate_data() {
// seed the prng so failure is deterministic
- random_numbers(num_values_, 0.5, draws_);
+ random_numbers(num_values_, 0, draws_);
}
void encode_decode(ColumnDescriptor *d) {
@@ -141,7 +143,7 @@ void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() {
int max_byte_array_len = 12 + sizeof(uint32_t);
size_t nbytes = num_values_ * max_byte_array_len;
data_buffer_.resize(nbytes);
- random_byte_array(num_values_, 0.5, data_buffer_.data(), draws_,
+ random_byte_array(num_values_, 0, data_buffer_.data(), draws_,
max_byte_array_len);
}
@@ -160,7 +162,7 @@ void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data() {
size_t nbytes = num_values_ * flba_length;
data_buffer_.resize(nbytes);
ASSERT_EQ(nbytes, data_buffer_.size());
- random_fixed_byte_array(num_values_, 0.5, data_buffer_.data(), flba_length, draws_);
+ random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_);
}
template<>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index e90889d..cfb3e86 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -20,92 +20,224 @@
#include <algorithm>
#include <cstdlib>
#include <cstdint>
+#include <cstring>
#include <exception>
#include <memory>
#include <string>
+#include <vector>
#include "parquet/column/page.h"
-#include "parquet/column/test-util.h"
-
+#include "parquet/compression/codec.h"
+#include "parquet/exception.h"
#include "parquet/file/reader-internal.h"
#include "parquet/thrift/parquet_types.h"
#include "parquet/thrift/util.h"
#include "parquet/types.h"
#include "parquet/util/input.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
namespace parquet_cpp {
-class TestSerializedPage : public ::testing::Test {
+
+// Adds page statistics occupying a certain amount of bytes (for testing very
+// large page headers)
+static inline void AddDummyStats(size_t stat_size,
+ parquet::DataPageHeader& data_page) {
+
+ std::vector<uint8_t> stat_bytes(stat_size);
+ // Some non-zero value
+ std::fill(stat_bytes.begin(), stat_bytes.end(), 1);
+ data_page.statistics.__set_max(std::string(
+ reinterpret_cast<const char*>(stat_bytes.data()), stat_size));
+ data_page.__isset.statistics = true;
+}
+
+class TestPageSerde : public ::testing::Test {
public:
- void InitSerializedPageReader(const uint8_t* buffer, size_t header_size,
- Compression::type codec) {
+ void SetUp() {
+ data_page_header_.encoding = parquet::Encoding::PLAIN;
+ data_page_header_.definition_level_encoding = parquet::Encoding::RLE;
+ data_page_header_.repetition_level_encoding = parquet::Encoding::RLE;
+
+ ResetStream();
+ }
+
+ void InitSerializedPageReader(Compression::type codec =
+ Compression::UNCOMPRESSED) {
+ EndStream();
std::unique_ptr<InputStream> stream;
- stream.reset(new InMemoryInputStream(buffer, header_size));
+ stream.reset(new InMemoryInputStream(out_buffer_.data(),
+ out_buffer_.size()));
page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
}
+ void WriteDataPageHeader(int max_serialized_len = 1024,
+ int32_t uncompressed_size = 0, int32_t compressed_size = 0) {
+ // Simplifying writing serialized data page headers which may or may not
+ // have meaningful data associated with them
+
+ // Serialize the Page header
+ uint32_t serialized_len = max_serialized_len;
+ page_header_.__set_data_page_header(data_page_header_);
+ page_header_.uncompressed_page_size = uncompressed_size;
+ page_header_.compressed_page_size = compressed_size;
+ page_header_.type = parquet::PageType::DATA_PAGE;
+
+ ASSERT_NO_THROW(SerializeThriftMsg(&page_header_, max_serialized_len,
+ out_stream_.get()));
+ }
+
+ void ResetStream() {
+ out_buffer_.resize(0);
+ out_stream_.reset(new InMemoryOutputStream());
+ }
+
+ void EndStream() {
+ out_stream_->Transfer(&out_buffer_);
+ }
+
protected:
+ std::unique_ptr<InMemoryOutputStream> out_stream_;
+
+ // TODO(wesm): Owns the results of the output stream. To be refactored
+ std::vector<uint8_t> out_buffer_;
+
std::unique_ptr<SerializedPageReader> page_reader_;
+ parquet::PageHeader page_header_;
+ parquet::DataPageHeader data_page_header_;
};
-TEST_F(TestSerializedPage, TestLargePageHeaders) {
- parquet::PageHeader in_page_header;
- parquet::DataPageHeader data_page_header;
+void CheckDataPageHeader(const parquet::DataPageHeader expected,
+ const Page* page) {
+ ASSERT_EQ(PageType::DATA_PAGE, page->type());
+
+ const DataPage* data_page = static_cast<const DataPage*>(page);
+ ASSERT_EQ(expected.num_values, data_page->num_values());
+ ASSERT_EQ(expected.encoding, data_page->encoding());
+ ASSERT_EQ(expected.definition_level_encoding,
+ data_page->definition_level_encoding());
+ ASSERT_EQ(expected.repetition_level_encoding,
+ data_page->repetition_level_encoding());
+
+ if (expected.statistics.__isset.max) {
+ ASSERT_EQ(0, memcmp(expected.statistics.max.c_str(),
+ data_page->max(), expected.statistics.max.length()));
+ }
+ if (expected.statistics.__isset.min) {
+ ASSERT_EQ(0, memcmp(expected.statistics.min.c_str(),
+ data_page->min(), expected.statistics.min.length()));
+ }
+}
+
+TEST_F(TestPageSerde, DataPage) {
parquet::PageHeader out_page_header;
- parquet::Statistics stats;
- int expected_header_size = 512 * 1024; //512 KB
+
+ int stats_size = 512;
+ AddDummyStats(stats_size, data_page_header_);
+ data_page_header_.num_values = 4444;
+
+ WriteDataPageHeader();
+ InitSerializedPageReader();
+ std::shared_ptr<Page> current_page = page_reader_->NextPage();
+ CheckDataPageHeader(data_page_header_, current_page.get());
+}
+
+TEST_F(TestPageSerde, TestLargePageHeaders) {
int stats_size = 256 * 1024; // 256 KB
- std::string serialized_buffer;
- int num_values = 4141;
+ AddDummyStats(stats_size, data_page_header_);
- InitStats(stats_size, stats);
- InitDataPage(stats, data_page_header, num_values);
- InitPageHeader(data_page_header, in_page_header);
+ // Any number to verify metadata roundtrip
+ data_page_header_.num_values = 4141;
- // Serialize the Page header
- ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
- expected_header_size));
- // check header size is between 256 KB to 16 MB
- ASSERT_LE(stats_size, serialized_buffer.length());
- ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
+ int max_header_size = 512 * 1024; // 512 KB
+ WriteDataPageHeader(max_header_size);
+ ASSERT_GE(max_header_size, out_stream_->Tell());
- InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
- serialized_buffer.length(), Compression::UNCOMPRESSED);
+ // check header size is between 256 KB to 16 MB
+ ASSERT_LE(stats_size, out_stream_->Tell());
+ ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell());
+ InitSerializedPageReader();
std::shared_ptr<Page> current_page = page_reader_->NextPage();
- ASSERT_EQ(PageType::DATA_PAGE, current_page->type());
- const DataPage* page = static_cast<const DataPage*>(current_page.get());
- ASSERT_EQ(num_values, page->num_values());
+ CheckDataPageHeader(data_page_header_, current_page.get());
}
-TEST_F(TestSerializedPage, TestFailLargePageHeaders) {
- parquet::PageHeader in_page_header;
- parquet::DataPageHeader data_page_header;
- parquet::PageHeader out_page_header;
- parquet::Statistics stats;
- int expected_header_size = 512 * 1024; // 512 KB
+TEST_F(TestPageSerde, TestFailLargePageHeaders) {
int stats_size = 256 * 1024; // 256 KB
- int max_header_size = 128 * 1024; // 128 KB
- int num_values = 4141;
- std::string serialized_buffer;
-
- InitStats(stats_size, stats);
- InitDataPage(stats, data_page_header, num_values);
- InitPageHeader(data_page_header, in_page_header);
+ AddDummyStats(stats_size, data_page_header_);
// Serialize the Page header
- ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header,
- expected_header_size));
- // check header size is between 256 KB to 16 MB
- ASSERT_LE(stats_size, serialized_buffer.length());
- ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, serialized_buffer.length());
+ int max_header_size = 512 * 1024; // 512 KB
+ WriteDataPageHeader(max_header_size);
+ ASSERT_GE(max_header_size, out_stream_->Tell());
- InitSerializedPageReader(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
- serialized_buffer.length(), Compression::UNCOMPRESSED);
-
- // Set the max page header size to 128 KB, which is less than the current header size
- page_reader_->set_max_page_header_size(max_header_size);
+ int smaller_max_size = 128 * 1024;
+ ASSERT_LE(smaller_max_size, out_stream_->Tell());
+ InitSerializedPageReader();
+ // Set the max page header size to 128 KB, which is less than the current
+ // header size
+ page_reader_->set_max_page_header_size(smaller_max_size);
ASSERT_THROW(page_reader_->NextPage(), ParquetException);
}
+
+TEST_F(TestPageSerde, Compression) {
+ Compression::type codec_types[2] = {Compression::GZIP, Compression::SNAPPY};
+
+ // This is a dummy number
+ data_page_header_.num_values = 32;
+
+ int num_pages = 10;
+
+ std::vector<std::vector<uint8_t> > faux_data;
+ faux_data.resize(num_pages);
+ for (int i = 0; i < num_pages; ++i) {
+ // The pages keep getting larger
+ int page_size = (i + 1) * 64;
+ test::random_bytes(page_size, 0, &faux_data[i]);
+ }
+ for (auto codec_type : codec_types) {
+ std::unique_ptr<Codec> codec = Codec::Create(codec_type);
+
+ std::vector<uint8_t> buffer;
+ for (int i = 0; i < num_pages; ++i) {
+ const uint8_t* data = faux_data[i].data();
+ size_t data_size = faux_data[i].size();
+
+ int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data);
+ buffer.resize(max_compressed_size);
+
+ int64_t actual_size = codec->Compress(data_size, data,
+ max_compressed_size, &buffer[0]);
+
+ WriteDataPageHeader(1024, data_size, actual_size);
+ out_stream_->Write(buffer.data(), actual_size);
+ }
+
+ InitSerializedPageReader(codec_type);
+
+ std::shared_ptr<Page> page;
+ const DataPage* data_page;
+ for (int i = 0; i < num_pages; ++i) {
+ size_t data_size = faux_data[i].size();
+ page = page_reader_->NextPage();
+ data_page = static_cast<const DataPage*>(page.get());
+ ASSERT_EQ(data_size, data_page->size());
+ ASSERT_EQ(0, memcmp(faux_data[i].data(), data_page->data(), data_size));
+ }
+
+ ResetStream();
+ }
+}
+
+TEST_F(TestPageSerde, LZONotSupported) {
+ // Must await PARQUET-530
+ int data_size = 1024;
+ std::vector<uint8_t> faux_data(data_size);
+ WriteDataPageHeader(1024, data_size, data_size);
+ out_stream_->Write(faux_data.data(), data_size);
+ ASSERT_THROW(InitSerializedPageReader(Compression::LZO), ParquetException);
+}
+
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 47092a5..0a93b00 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -21,6 +21,7 @@
#include <algorithm>
#include <exception>
#include <ostream>
+#include <string>
#include <vector>
#include "parquet/column/page.h"
@@ -40,22 +41,10 @@ namespace parquet_cpp {
// assembled in a serialized stream for storing in a Parquet files
SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
- Compression::type codec) :
+ Compression::type codec_type) :
stream_(std::move(stream)) {
max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
- // TODO(wesm): add GZIP after PARQUET-456
- switch (codec) {
- case Compression::UNCOMPRESSED:
- break;
- case Compression::SNAPPY:
- decompressor_.reset(new SnappyCodec());
- break;
- case Compression::LZO:
- decompressor_.reset(new Lz4Codec());
- break;
- default:
- ParquetException::NYI("Reading compressed data");
- }
+ decompressor_ = Codec::Create(codec_type);
}
std::shared_ptr<Page> SerializedPageReader::NextPage() {
@@ -126,11 +115,22 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
} else if (current_page_header_.type == parquet::PageType::DATA_PAGE) {
const parquet::DataPageHeader& header = current_page_header_.data_page_header;
- return std::make_shared<DataPage>(buffer, uncompressed_len,
+ auto page = std::make_shared<DataPage>(buffer, uncompressed_len,
header.num_values,
FromThrift(header.encoding),
FromThrift(header.definition_level_encoding),
FromThrift(header.repetition_level_encoding));
+
+ if (header.__isset.statistics) {
+ const parquet::Statistics stats = header.statistics;
+ if (stats.__isset.max) {
+ page->max_ = stats.max;
+ }
+ if (stats.__isset.min) {
+ page->min_ = stats.min;
+ }
+ }
+ return page;
} else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) {
const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
bool is_compressed = header.__isset.is_compressed? header.is_compressed : false;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index c63df54..83b136d 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -27,6 +27,7 @@
#include "parquet/exception.h"
#include "parquet/schema/descriptor.h"
#include "parquet/schema/types.h"
+#include "parquet/types.h"
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/CMakeLists.txt b/src/parquet/thrift/CMakeLists.txt
index 29b8ef8..f43c2a5 100644
--- a/src/parquet/thrift/CMakeLists.txt
+++ b/src/parquet/thrift/CMakeLists.txt
@@ -44,5 +44,3 @@ add_custom_command(
COMMENT "Running thrift compiler on parquet.thrift"
VERBATIM
)
-
-ADD_PARQUET_TEST(serializer-test)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/serializer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/serializer-test.cc b/src/parquet/thrift/serializer-test.cc
deleted file mode 100644
index 756fd10..0000000
--- a/src/parquet/thrift/serializer-test.cc
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <cstdint>
-#include <exception>
-#include <string>
-
-#include "parquet/column/test-util.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/thrift/util.h"
-
-using std::string;
-
-namespace parquet_cpp {
-
-class TestThrift : public ::testing::Test {
-
-};
-
-TEST_F(TestThrift, TestSerializerDeserializer) {
- parquet::PageHeader in_page_header;
- parquet::DataPageHeader data_page_header;
- parquet::PageHeader out_page_header;
- parquet::Statistics stats;
- uint32_t max_header_len = 1024;
- uint32_t expected_header_size = 1024;
- uint32_t stats_size = 512;
- std::string serialized_buffer;
- int num_values = 4444;
-
- InitStats(stats_size, stats);
- InitDataPage(stats, data_page_header, num_values);
- InitPageHeader(data_page_header, in_page_header);
-
- // Serialize the Page header
- ASSERT_NO_THROW(serialized_buffer = SerializeThriftMsg(&in_page_header, expected_header_size));
- ASSERT_LE(stats_size, serialized_buffer.length());
- ASSERT_GE(max_header_len, serialized_buffer.length());
-
- uint32_t header_size = 1024;
- // Deserialize the serialized page buffer
- ASSERT_NO_THROW(DeserializeThriftMsg(reinterpret_cast<const uint8_t*>(serialized_buffer.c_str()),
- &header_size, &out_page_header));
- ASSERT_LE(stats_size, header_size);
- ASSERT_GE(max_header_len, header_size);
-
- ASSERT_EQ(parquet::Encoding::PLAIN, out_page_header.data_page_header.encoding);
- ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.definition_level_encoding);
- ASSERT_EQ(parquet::Encoding::RLE, out_page_header.data_page_header.repetition_level_encoding);
- for(int i = 0; i < stats_size; i++){
- EXPECT_EQ(i % 255, (reinterpret_cast<const uint8_t*>
- (out_page_header.data_page_header.statistics.max.c_str()))[i]);
- }
- ASSERT_EQ(parquet::PageType::DATA_PAGE, out_page_header.type);
- ASSERT_EQ(num_values, out_page_header.data_page_header.num_values);
-
-}
-
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 8c34197..5f29820 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -18,8 +18,9 @@
#include <sstream>
#include "parquet/exception.h"
-#include "parquet/util/logging.h"
#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/output.h"
namespace parquet_cpp {
@@ -77,7 +78,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
// The arguments are the object to be serialized and
// the expected size of the serialized object
template <class T>
-inline std::string SerializeThriftMsg(T* obj, uint32_t len) {
+inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
new apache::thrift::transport::TMemoryBuffer(len));
apache::thrift::protocol::TCompactProtocolFactoryT<
@@ -92,7 +93,11 @@ inline std::string SerializeThriftMsg(T* obj, uint32_t len) {
ss << "Couldn't serialize thrift: " << e.what() << "\n";
throw ParquetException(ss.str());
}
- return mem_buffer->getBufferAsString();
+
+ uint8_t* out_buffer;
+ uint32_t out_length;
+ mem_buffer->getBuffer(&out_buffer, &out_length);
+ out->Write(out_buffer, out_length);
}
} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/util/macros.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/macros.h b/src/parquet/util/macros.h
index 7b301d6..d221173 100644
--- a/src/parquet/util/macros.h
+++ b/src/parquet/util/macros.h
@@ -20,6 +20,11 @@
// Useful macros from elsewhere
+// From Google gutil
+#define DISALLOW_COPY_AND_ASSIGN(TypeName) \
+ TypeName(const TypeName&) = delete; \
+ void operator=(const TypeName&) = delete
+
// ----------------------------------------------------------------------
// From googletest
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/89198543/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index be25abd..68a09e2 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -21,6 +21,8 @@
#include <cstdint>
#include <vector>
+#include "parquet/util/macros.h"
+
namespace parquet_cpp {
// ----------------------------------------------------------------------
@@ -63,6 +65,8 @@ class InMemoryOutputStream : public OutputStream {
std::vector<uint8_t> buffer_;
int64_t size_;
int64_t capacity_;
+
+ DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
};
} // namespace parquet_cpp