You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:25 UTC
[2/6] parquet-cpp git commit: PARQUET-858: Flatten column directory,
minor code consolidation
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
new file mode 100644
index 0000000..798c7ba
--- /dev/null
+++ b/src/parquet/column_writer-test.cc
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/file/reader-internal.h"
+#include "parquet/file/writer-internal.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
+#include "parquet/types.h"
+#include "parquet/util/comparison.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+
+using schema::NodePtr;
+using schema::PrimitiveNode;
+
+namespace test {
+
+// The default size used in most tests.
+const int SMALL_SIZE = 100;
+// Larger size to test some corner cases, only used in some specific cases.
+const int LARGE_SIZE = 100000;
+// Very large size to test dictionary fallback.
+const int VERY_LARGE_SIZE = 400000;
+
+template <typename TestType>
+class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
+ public:
+ typedef typename TestType::c_type T;
+
+ void SetUp() {
+ this->SetupValuesOut(SMALL_SIZE);
+ writer_properties_ = default_writer_properties();
+ definition_levels_out_.resize(SMALL_SIZE);
+ repetition_levels_out_.resize(SMALL_SIZE);
+
+ this->SetUpSchema(Repetition::REQUIRED);
+
+ descr_ = this->schema_.Column(0);
+ }
+
+ Type::type type_num() { return TestType::type_num; }
+
+ void BuildReader(
+ int64_t num_rows, Compression::type compression = Compression::UNCOMPRESSED) {
+ auto buffer = sink_->GetBuffer();
+ std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
+ std::unique_ptr<SerializedPageReader> page_reader(
+ new SerializedPageReader(std::move(source), num_rows, compression));
+ reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
+ }
+
+ std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
+ int64_t output_size = SMALL_SIZE,
+ const ColumnProperties& column_properties = ColumnProperties()) {
+ sink_.reset(new InMemoryOutputStream());
+ metadata_ = ColumnChunkMetaDataBuilder::Make(
+ writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
+ std::unique_ptr<SerializedPageWriter> pager(
+ new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get()));
+ WriterProperties::Builder wp_builder;
+ if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
+ column_properties.encoding == Encoding::RLE_DICTIONARY) {
+ wp_builder.enable_dictionary();
+ } else {
+ wp_builder.disable_dictionary();
+ wp_builder.encoding(column_properties.encoding);
+ }
+ writer_properties_ = wp_builder.build();
+ std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
+ metadata_.get(), std::move(pager), output_size, writer_properties_.get());
+ return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
+ }
+
+ void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
+ BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
+ reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
+ definition_levels_out_.data(), repetition_levels_out_.data(),
+ this->values_out_ptr_, &values_read_);
+ this->SyncValuesOut();
+ }
+
+ void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
+
+ void TestRequiredWithEncoding(Encoding::type encoding) {
+ return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
+ }
+
+ void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
+ bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
+ this->GenerateData(num_rows);
+
+ this->WriteRequiredWithSettings(
+ encoding, compression, enable_dictionary, enable_statistics, num_rows);
+ this->ReadAndCompare(compression, num_rows);
+
+ this->WriteRequiredWithSettingsSpaced(
+ encoding, compression, enable_dictionary, enable_statistics, num_rows);
+ this->ReadAndCompare(compression, num_rows);
+ }
+
+ void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
+ bool enable_dictionary, bool enable_statistics, int64_t num_rows) {
+ ColumnProperties column_properties(
+ encoding, compression, enable_dictionary, enable_statistics);
+ std::shared_ptr<TypedColumnWriter<TestType>> writer =
+ this->BuildWriter(num_rows, column_properties);
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+ // The behaviour should be independent from the number of Close() calls
+ writer->Close();
+ writer->Close();
+ }
+
+ void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
+ Compression::type compression, bool enable_dictionary, bool enable_statistics,
+ int64_t num_rows) {
+ std::vector<uint8_t> valid_bits(
+ BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
+ ColumnProperties column_properties(
+ encoding, compression, enable_dictionary, enable_statistics);
+ std::shared_ptr<TypedColumnWriter<TestType>> writer =
+ this->BuildWriter(num_rows, column_properties);
+ writer->WriteBatchSpaced(
+ this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, this->values_ptr_);
+ // The behaviour should be independent from the number of Close() calls
+ writer->Close();
+ writer->Close();
+ }
+
+ void ReadAndCompare(Compression::type compression, int64_t num_rows) {
+ this->SetupValuesOut(num_rows);
+ this->ReadColumnFully(compression);
+ Compare<T> compare(this->descr_);
+ for (size_t i = 0; i < this->values_.size(); i++) {
+ if (compare(this->values_[i], this->values_out_[i]) ||
+ compare(this->values_out_[i], this->values_[i])) {
+ std::cout << "Failed at " << i << std::endl;
+ }
+ ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
+ ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
+ }
+ ASSERT_EQ(this->values_, this->values_out_);
+ }
+
+ int64_t metadata_num_values() {
+ // Metadata accessor must be created lazily.
+ // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+ // complete (no changes to the metadata buffer can be made after instantiation)
+ auto metadata_accessor = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
+ return metadata_accessor->num_values();
+ }
+
+ std::vector<Encoding::type> metadata_encodings() {
+ // Metadata accessor must be created lazily.
+ // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+ // complete (no changes to the metadata buffer can be made after instantiation)
+ auto metadata_accessor = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
+ return metadata_accessor->encodings();
+ }
+
+ protected:
+ int64_t values_read_;
+ // Keep the reader alive as for ByteArray the lifetime of the ByteArray
+ // content is bound to the reader.
+ std::unique_ptr<TypedColumnReader<TestType>> reader_;
+
+ std::vector<int16_t> definition_levels_out_;
+ std::vector<int16_t> repetition_levels_out_;
+
+ const ColumnDescriptor* descr_;
+
+ private:
+ format::ColumnChunk thrift_metadata_;
+ std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+ std::unique_ptr<InMemoryOutputStream> sink_;
+ std::shared_ptr<WriterProperties> writer_properties_;
+ std::vector<std::vector<uint8_t>> data_buffer_;
+};
+
+template <typename TestType>
+void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
+ int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+ BuildReader(total_values, compression);
+ values_read_ = 0;
+ while (values_read_ < total_values) {
+ int64_t values_read_recently = 0;
+ reader_->ReadBatch(
+ static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
+ definition_levels_out_.data() + values_read_,
+ repetition_levels_out_.data() + values_read_,
+ this->values_out_ptr_ + values_read_, &values_read_recently);
+ values_read_ += values_read_recently;
+ }
+ this->SyncValuesOut();
+}
+
+template <>
+void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
+ int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+ BuildReader(total_values, compression);
+ this->data_buffer_.clear();
+
+ values_read_ = 0;
+ while (values_read_ < total_values) {
+ int64_t values_read_recently = 0;
+ reader_->ReadBatch(
+ static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
+ definition_levels_out_.data() + values_read_,
+ repetition_levels_out_.data() + values_read_,
+ this->values_out_ptr_ + values_read_, &values_read_recently);
+
+ // Copy contents of the pointers
+ std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
+ uint8_t* data_ptr = data.data();
+ for (int64_t i = 0; i < values_read_recently; i++) {
+ memcpy(data_ptr + this->descr_->type_length() * i,
+ this->values_out_[i + values_read_].ptr, this->descr_->type_length());
+ this->values_out_[i + values_read_].ptr =
+ data_ptr + this->descr_->type_length() * i;
+ }
+ data_buffer_.emplace_back(std::move(data));
+
+ values_read_ += values_read_recently;
+ }
+ this->SyncValuesOut();
+}
+
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+ BooleanType, ByteArrayType, FLBAType>
+ TestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
+
+using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
+ this->TestRequiredWithEncoding(Encoding::PLAIN);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
+ this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
+}
+
+/*
+TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
+ this->TestRequiredWithEncoding(Encoding::RLE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
+ this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
+ this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
+ this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
+}
+*/
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::BROTLI, false, false, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::BROTLI, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
+ this->TestRequiredWithSettings(
+ Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, Optional) {
+ // Optional and non-repeated, with definition levels
+ // but no repetition levels
+ this->SetUpSchema(Repetition::OPTIONAL);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ definition_levels[1] = 0;
+
+ auto writer = this->BuildWriter();
+ writer->WriteBatch(
+ this->values_.size(), definition_levels.data(), nullptr, this->values_ptr_);
+ writer->Close();
+
+ // PARQUET-703
+ ASSERT_EQ(100, this->metadata_num_values());
+
+ this->ReadColumn();
+ ASSERT_EQ(99, this->values_read_);
+ this->values_out_.resize(99);
+ this->values_.resize(99);
+ ASSERT_EQ(this->values_, this->values_out_);
+}
+
+TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
+ // Optional and non-repeated, with definition levels
+ // but no repetition levels
+ this->SetUpSchema(Repetition::OPTIONAL);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255);
+
+ definition_levels[SMALL_SIZE - 1] = 0;
+ ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
+ definition_levels[1] = 0;
+ ::arrow::BitUtil::ClearBit(valid_bits.data(), 1);
+
+ auto writer = this->BuildWriter();
+ writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
+ valid_bits.data(), 0, this->values_ptr_);
+ writer->Close();
+
+ // PARQUET-703
+ ASSERT_EQ(100, this->metadata_num_values());
+
+ this->ReadColumn();
+ ASSERT_EQ(98, this->values_read_);
+ this->values_out_.resize(98);
+ this->values_.resize(99);
+ this->values_.erase(this->values_.begin() + 1);
+ ASSERT_EQ(this->values_, this->values_out_);
+}
+
+TYPED_TEST(TestPrimitiveWriter, Repeated) {
+ // Optional and repeated, so definition and repetition levels
+ this->SetUpSchema(Repetition::REPEATED);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ definition_levels[1] = 0;
+ std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
+
+ auto writer = this->BuildWriter();
+ writer->WriteBatch(this->values_.size(), definition_levels.data(),
+ repetition_levels.data(), this->values_ptr_);
+ writer->Close();
+
+ this->ReadColumn();
+ ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
+ this->values_out_.resize(SMALL_SIZE - 1);
+ this->values_.resize(SMALL_SIZE - 1);
+ ASSERT_EQ(this->values_, this->values_out_);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredTooFewRows) {
+ this->GenerateData(SMALL_SIZE - 1);
+
+ auto writer = this->BuildWriter();
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+ ASSERT_THROW(writer->Close(), ParquetException);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredTooMany) {
+ this->GenerateData(2 * SMALL_SIZE);
+
+ auto writer = this->BuildWriter();
+ ASSERT_THROW(
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_),
+ ParquetException);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
+ // Optional and repeated, so definition and repetition levels
+ this->SetUpSchema(Repetition::REPEATED);
+
+ this->GenerateData(SMALL_SIZE);
+ std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+ definition_levels[1] = 0;
+ std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
+ repetition_levels[3] = 1;
+
+ auto writer = this->BuildWriter();
+ writer->WriteBatch(this->values_.size(), definition_levels.data(),
+ repetition_levels.data(), this->values_ptr_);
+ ASSERT_THROW(writer->Close(), ParquetException);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
+ this->GenerateData(LARGE_SIZE);
+
+ // Test case 1: required and non-repeated, so no definition or repetition levels
+ auto writer = this->BuildWriter(LARGE_SIZE);
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+ writer->Close();
+
+ // Just read the first SMALL_SIZE rows to ensure we could read it back in
+ this->ReadColumn();
+ ASSERT_EQ(SMALL_SIZE, this->values_read_);
+ this->values_.resize(SMALL_SIZE);
+ ASSERT_EQ(this->values_, this->values_out_);
+}
+
+// Test case for dictionary fallback encoding
+TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
+ this->GenerateData(VERY_LARGE_SIZE);
+
+ auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
+ writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+ writer->Close();
+
+ // Read all rows so we are sure that also the non-dictionary pages are read correctly
+ this->SetupValuesOut(VERY_LARGE_SIZE);
+ this->ReadColumnFully();
+ ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
+ this->values_.resize(VERY_LARGE_SIZE);
+ ASSERT_EQ(this->values_, this->values_out_);
+ std::vector<Encoding::type> encodings = this->metadata_encodings();
+ // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
+ // Dictionary encoding is not allowed for boolean type
+ // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
+ if (this->type_num() != Type::BOOLEAN) {
+ ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]);
+ ASSERT_EQ(Encoding::PLAIN, encodings[1]);
+ ASSERT_EQ(Encoding::RLE, encodings[2]);
+ } else {
+ ASSERT_EQ(Encoding::PLAIN, encodings[0]);
+ ASSERT_EQ(Encoding::RLE, encodings[1]);
+ }
+}
+
+// PARQUET-719
+// Test case for NULL values
+TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+
+ this->GenerateData(LARGE_SIZE);
+
+ std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
+ std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);
+
+ auto writer = this->BuildWriter(LARGE_SIZE);
+ // All values being written are NULL
+ writer->WriteBatch(
+ this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL);
+ writer->Close();
+
+ // Just read the first SMALL_SIZE rows to ensure we could read it back in
+ this->ReadColumn();
+ ASSERT_EQ(0, this->values_read_);
+}
+
+// PARQUET-764
+// Correct bitpacking for boolean write at non-byte boundaries
+using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
+TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
+ this->SetUpSchema(Repetition::REQUIRED);
+ auto writer = this->BuildWriter();
+ for (int i = 0; i < SMALL_SIZE; i++) {
+ bool value = (i % 2 == 0) ? true : false;
+ writer->WriteBatch(1, nullptr, nullptr, &value);
+ }
+ writer->Close();
+ this->ReadColumn();
+ for (int i = 0; i < SMALL_SIZE; i++) {
+ ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
+ }
+}
+
+void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
+ std::vector<int16_t>& input_levels) {
+ // for each repetition count upto max_repeat_factor
+ for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
+ // repeat count increases by a factor of 2 for every iteration
+ int repeat_count = (1 << repeat);
+ // generate levels for repetition count upto the maximum level
+ int value = 0;
+ int bwidth = 0;
+ while (value <= max_level) {
+ for (int i = 0; i < repeat_count; i++) {
+ input_levels.push_back(value);
+ }
+ value = (2 << bwidth) - 1;
+ bwidth++;
+ }
+ }
+}
+
+void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
+ const int16_t* input_levels, std::vector<uint8_t>& bytes) {
+ LevelEncoder encoder;
+ int levels_count = 0;
+ bytes.resize(2 * num_levels);
+ ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size()));
+ // encode levels
+ if (encoding == Encoding::RLE) {
+ // leave space to write the rle length value
+ encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
+ static_cast<int>(bytes.size()));
+
+ levels_count = encoder.Encode(num_levels, input_levels);
+ (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
+ } else {
+ encoder.Init(
+ encoding, max_level, num_levels, bytes.data(), static_cast<int>(bytes.size()));
+ levels_count = encoder.Encode(num_levels, input_levels);
+ }
+ ASSERT_EQ(num_levels, levels_count);
+}
+
+void VerifyDecodingLevels(Encoding::type encoding, int max_level,
+ std::vector<int16_t>& input_levels, std::vector<uint8_t>& bytes) {
+ LevelDecoder decoder;
+ int levels_count = 0;
+ std::vector<int16_t> output_levels;
+ int num_levels = static_cast<int>(input_levels.size());
+
+ output_levels.resize(num_levels);
+ ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
+
+ // Decode levels and test with multiple decode calls
+ decoder.SetData(encoding, max_level, num_levels, bytes.data());
+ int decode_count = 4;
+ int num_inner_levels = num_levels / decode_count;
+ // Try multiple decoding on a single SetData call
+ for (int ct = 0; ct < decode_count; ct++) {
+ int offset = ct * num_inner_levels;
+ levels_count = decoder.Decode(num_inner_levels, output_levels.data());
+ ASSERT_EQ(num_inner_levels, levels_count);
+ for (int i = 0; i < num_inner_levels; i++) {
+ EXPECT_EQ(input_levels[i + offset], output_levels[i]);
+ }
+ }
+ // check the remaining levels
+ int num_levels_completed = decode_count * (num_levels / decode_count);
+ int num_remaining_levels = num_levels - num_levels_completed;
+ if (num_remaining_levels > 0) {
+ levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
+ ASSERT_EQ(num_remaining_levels, levels_count);
+ for (int i = 0; i < num_remaining_levels; i++) {
+ EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
+ }
+ }
+ // Test zero Decode values
+ ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
+}
+
+void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level,
+ std::vector<int16_t>& input_levels, std::vector<std::vector<uint8_t>>& bytes) {
+ LevelDecoder decoder;
+ int levels_count = 0;
+ std::vector<int16_t> output_levels;
+
+ // Decode levels and test with multiple SetData calls
+ int setdata_count = static_cast<int>(bytes.size());
+ int num_levels = static_cast<int>(input_levels.size()) / setdata_count;
+ output_levels.resize(num_levels);
+ // Try multiple SetData
+ for (int ct = 0; ct < setdata_count; ct++) {
+ int offset = ct * num_levels;
+ ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
+ decoder.SetData(encoding, max_level, num_levels, bytes[ct].data());
+ levels_count = decoder.Decode(num_levels, output_levels.data());
+ ASSERT_EQ(num_levels, levels_count);
+ for (int i = 0; i < num_levels; i++) {
+ EXPECT_EQ(input_levels[i + offset], output_levels[i]);
+ }
+ }
+}
+
+// Test levels with maximum bit-width from 1 to 8
+// increase the repetition count for each iteration by a factor of 2
+TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
+ int min_repeat_factor = 0;
+ int max_repeat_factor = 7; // 128
+ int max_bit_width = 8;
+ std::vector<int16_t> input_levels;
+ std::vector<uint8_t> bytes;
+ Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
+
+ // for each encoding
+ for (int encode = 0; encode < 2; encode++) {
+ Encoding::type encoding = encodings[encode];
+ // BIT_PACKED requires a sequence of atleast 8
+ if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
+ // for each maximum bit-width
+ for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
+ // find the maximum level for the current bit_width
+ int max_level = (1 << bit_width) - 1;
+ // Generate levels
+ GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
+ EncodeLevels(encoding, max_level, static_cast<int>(input_levels.size()),
+ input_levels.data(), bytes);
+ VerifyDecodingLevels(encoding, max_level, input_levels, bytes);
+ input_levels.clear();
+ }
+ }
+}
+
+// Test multiple decoder SetData calls
+TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
+ int min_repeat_factor = 3;
+ int max_repeat_factor = 7; // 128
+ int bit_width = 8;
+ int max_level = (1 << bit_width) - 1;
+ std::vector<int16_t> input_levels;
+ std::vector<std::vector<uint8_t>> bytes;
+ Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
+ GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
+ int num_levels = static_cast<int>(input_levels.size());
+ int setdata_factor = 8;
+ int split_level_size = num_levels / setdata_factor;
+ bytes.resize(setdata_factor);
+
+ // for each encoding
+ for (int encode = 0; encode < 2; encode++) {
+ Encoding::type encoding = encodings[encode];
+ for (int rf = 0; rf < setdata_factor; rf++) {
+ int offset = rf * split_level_size;
+ EncodeLevels(encoding, max_level, split_level_size,
+ reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
+ }
+ VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes);
+ }
+}
+
+TEST(TestLevelEncoder, MinimumBufferSize) {
+ // PARQUET-676, PARQUET-698
+ const int kNumToEncode = 1024;
+
+ std::vector<int16_t> levels;
+ for (int i = 0; i < kNumToEncode; ++i) {
+ if (i % 9 == 0) {
+ levels.push_back(0);
+ } else {
+ levels.push_back(1);
+ }
+ }
+
+ std::vector<uint8_t> output(
+ LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
+
+ LevelEncoder encoder;
+ encoder.Init(
+ Encoding::RLE, 1, kNumToEncode, output.data(), static_cast<int>(output.size()));
+ int encode_count = encoder.Encode(kNumToEncode, levels.data());
+
+ ASSERT_EQ(kNumToEncode, encode_count);
+}
+
+TEST(TestLevelEncoder, MinimumBufferSize2) {
+ // PARQUET-708
+ // Test the worst case for bit_width=2 consisting of
+ // LiteralRun(size=8)
+ // RepeatedRun(size=8)
+ // LiteralRun(size=8)
+ // ...
+ const int kNumToEncode = 1024;
+
+ std::vector<int16_t> levels;
+ for (int i = 0; i < kNumToEncode; ++i) {
+ // This forces a literal run of 00000001
+ // followed by eight 1s
+ if ((i % 16) < 7) {
+ levels.push_back(0);
+ } else {
+ levels.push_back(1);
+ }
+ }
+
+ for (int bit_width = 1; bit_width <= 8; bit_width++) {
+ std::vector<uint8_t> output(
+ LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode));
+
+ LevelEncoder encoder;
+ encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
+ static_cast<int>(output.size()));
+ int encode_count = encoder.Encode(kNumToEncode, levels.data());
+
+ ASSERT_EQ(kNumToEncode, encode_count);
+ }
+}
+
+} // namespace test
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
new file mode 100644
index 0000000..c13d4a0
--- /dev/null
+++ b/src/parquet/column_writer.cc
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column_writer.h"
+
+#include "parquet/encoding-internal.h"
+#include "parquet/properties.h"
+#include "parquet/statistics.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace parquet {
+
+LevelEncoder::LevelEncoder() {}
+LevelEncoder::~LevelEncoder() {}
+
+void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
+ int num_buffered_values, uint8_t* data, int data_size) {
+ bit_width_ = BitUtil::Log2(max_level + 1);
+ encoding_ = encoding;
+ switch (encoding) {
+ case Encoding::RLE: {
+ rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
+ break;
+ }
+ case Encoding::BIT_PACKED: {
+ int num_bytes =
+ static_cast<int>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
+ bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
+ break;
+ }
+ default:
+ throw ParquetException("Unknown encoding type for levels.");
+ }
+}
+
+int LevelEncoder::MaxBufferSize(
+ Encoding::type encoding, int16_t max_level, int num_buffered_values) {
+ int bit_width = BitUtil::Log2(max_level + 1);
+ int num_bytes = 0;
+ switch (encoding) {
+ case Encoding::RLE: {
+ // TODO: Due to the way we currently check if the buffer is full enough,
+ // we need to have MinBufferSize as head room.
+ num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
+ RleEncoder::MinBufferSize(bit_width);
+ break;
+ }
+ case Encoding::BIT_PACKED: {
+ num_bytes = static_cast<int>(BitUtil::Ceil(num_buffered_values * bit_width, 8));
+ break;
+ }
+ default:
+ throw ParquetException("Unknown encoding type for levels.");
+ }
+ return num_bytes;
+}
+
+int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
+ int num_encoded = 0;
+ if (!rle_encoder_ && !bit_packed_encoder_) {
+ throw ParquetException("Level encoders are not initialized.");
+ }
+
+ if (encoding_ == Encoding::RLE) {
+ for (int i = 0; i < batch_size; ++i) {
+ if (!rle_encoder_->Put(*(levels + i))) { break; }
+ ++num_encoded;
+ }
+ rle_encoder_->Flush();
+ rle_length_ = rle_encoder_->len();
+ } else {
+ for (int i = 0; i < batch_size; ++i) {
+ if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { break; }
+ ++num_encoded;
+ }
+ bit_packed_encoder_->Flush();
+ }
+ return num_encoded;
+}
+
+// ----------------------------------------------------------------------
+// ColumnWriter
+
+std::shared_ptr<WriterProperties> default_writer_properties() {
+ static std::shared_ptr<WriterProperties> default_writer_properties =
+ WriterProperties::Builder().build();
+ return default_writer_properties;
+}
+
+ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows, bool has_dictionary,
+ Encoding::type encoding, const WriterProperties* properties)
+ : metadata_(metadata),
+ descr_(metadata->descr()),
+ pager_(std::move(pager)),
+ expected_rows_(expected_rows),
+ has_dictionary_(has_dictionary),
+ encoding_(encoding),
+ properties_(properties),
+ allocator_(properties->memory_pool()),
+ pool_(properties->memory_pool()),
+ num_buffered_values_(0),
+ num_buffered_encoded_values_(0),
+ num_rows_(0),
+ total_bytes_written_(0),
+ closed_(false),
+ fallback_(false) {
+ definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+ repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+ definition_levels_rle_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ repetition_levels_rle_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ uncompressed_data_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ if (pager_->has_compressor()) {
+ compressed_data_ =
+ std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+ }
+}
+
+void ColumnWriter::InitSinks() {
+ definition_levels_sink_->Clear();
+ repetition_levels_sink_->Clear();
+}
+
+void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
+ DCHECK(!closed_);
+ definition_levels_sink_->Write(
+ reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
+}
+
+void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
+ DCHECK(!closed_);
+ repetition_levels_sink_->Write(
+ reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
+}
+
+// return the size of the encoded buffer
+int64_t ColumnWriter::RleEncodeLevels(
+ const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) {
+ // TODO: This only works with due to some RLE specifics
+ int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
+ static_cast<int>(num_buffered_values_)) +
+ sizeof(int32_t);
+
+ // Use Arrow::Buffer::shrink_to_fit = false
+ // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+ PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
+
+ level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
+ dest_buffer->mutable_data() + sizeof(int32_t),
+ static_cast<int>(dest_buffer->size()) - sizeof(int32_t));
+ int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
+ reinterpret_cast<const int16_t*>(src_buffer.data()));
+ DCHECK_EQ(encoded, num_buffered_values_);
+ reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
+ int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
+ return encoded_size;
+}
+
+void ColumnWriter::AddDataPage() {
+ int64_t definition_levels_rle_size = 0;
+ int64_t repetition_levels_rle_size = 0;
+
+ std::shared_ptr<Buffer> values = GetValuesBuffer();
+
+ if (descr_->max_definition_level() > 0) {
+ definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
+ definition_levels_rle_.get(), descr_->max_definition_level());
+ }
+
+ if (descr_->max_repetition_level() > 0) {
+ repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
+ repetition_levels_rle_.get(), descr_->max_repetition_level());
+ }
+
+ int64_t uncompressed_size =
+ definition_levels_rle_size + repetition_levels_rle_size + values->size();
+
+ // Use Arrow::Buffer::shrink_to_fit = false
+ // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+ PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
+
+ // Concatenate data into a single buffer
+ uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
+ memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
+ uncompressed_ptr += repetition_levels_rle_size;
+ memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
+ uncompressed_ptr += definition_levels_rle_size;
+ memcpy(uncompressed_ptr, values->data(), values->size());
+
+ EncodedStatistics page_stats = GetPageStatistics();
+ ResetPageStatistics();
+
+ std::shared_ptr<Buffer> compressed_data;
+ if (pager_->has_compressor()) {
+ pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
+ compressed_data = compressed_data_;
+ } else {
+ compressed_data = uncompressed_data_;
+ }
+
+ // Write the page to OutputStream eagerly if there is no dictionary or
+ // if dictionary encoding has fallen back to PLAIN
+ if (has_dictionary_ && !fallback_) { // Save pages until end of dictionary encoding
+ std::shared_ptr<Buffer> compressed_data_copy;
+ PARQUET_THROW_NOT_OK(compressed_data->Copy(
+ 0, compressed_data->size(), allocator_, &compressed_data_copy));
+ CompressedDataPage page(compressed_data_copy,
+ static_cast<int32_t>(num_buffered_values_), encoding_, Encoding::RLE,
+ Encoding::RLE, uncompressed_size, page_stats);
+ data_pages_.push_back(std::move(page));
+ } else { // Eagerly write pages
+ CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
+ encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+ WriteDataPage(page);
+ }
+
+ // Re-initialize the sinks for next Page.
+ InitSinks();
+ num_buffered_values_ = 0;
+ num_buffered_encoded_values_ = 0;
+}
+
+void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
+ total_bytes_written_ += pager_->WriteDataPage(page);
+}
+
+int64_t ColumnWriter::Close() {
+ if (!closed_) {
+ closed_ = true;
+ if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
+
+ FlushBufferedDataPages();
+
+ EncodedStatistics chunk_statistics = GetChunkStatistics();
+ if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
+ pager_->Close(has_dictionary_, fallback_);
+ }
+
+ if (num_rows_ != expected_rows_) {
+ std::stringstream ss;
+ ss << "Written rows: " << num_rows_ << " != expected rows: " << expected_rows_
+ << "in the current column chunk";
+ throw ParquetException(ss.str());
+ }
+
+ return total_bytes_written_;
+}
+
+void ColumnWriter::FlushBufferedDataPages() {
+ // Write all outstanding data to a new page
+ if (num_buffered_values_ > 0) { AddDataPage(); }
+ for (size_t i = 0; i < data_pages_.size(); i++) {
+ WriteDataPage(data_pages_[i]);
+ }
+ data_pages_.clear();
+}
+
+// ----------------------------------------------------------------------
+// TypedColumnWriter
+
+template <typename Type>
+TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
+ const WriterProperties* properties)
+ : ColumnWriter(metadata, std::move(pager), expected_rows,
+ (encoding == Encoding::PLAIN_DICTIONARY ||
+ encoding == Encoding::RLE_DICTIONARY),
+ encoding, properties) {
+ switch (encoding) {
+ case Encoding::PLAIN:
+ current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
+ break;
+ case Encoding::PLAIN_DICTIONARY:
+ case Encoding::RLE_DICTIONARY:
+ current_encoder_.reset(
+ new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
+ break;
+ default:
+ ParquetException::NYI("Selected encoding is not supported");
+ }
+
+ if (properties->statistics_enabled(descr_->path())) {
+ page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+ chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+ }
+}
+
+// Only one Dictionary Page is written.
+// Fallback to PLAIN if dictionary page limit is reached.
+template <typename Type>
+void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
+ auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+ if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
+ WriteDictionaryPage();
+ // Serialize the buffered Dictionary Indicies
+ FlushBufferedDataPages();
+ fallback_ = true;
+ // Only PLAIN encoding is supported for fallback in V1
+ current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool()));
+ encoding_ = Encoding::PLAIN;
+ }
+}
+
+template <typename Type>
+void TypedColumnWriter<Type>::WriteDictionaryPage() {
+ auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+ std::shared_ptr<PoolBuffer> buffer =
+ AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
+ dict_encoder->WriteDict(buffer->mutable_data());
+ // TODO Get rid of this deep call
+ dict_encoder->mem_pool()->FreeAll();
+
+ DictionaryPage page(
+ buffer, dict_encoder->num_entries(), properties_->dictionary_index_encoding());
+ total_bytes_written_ += pager_->WriteDictionaryPage(page);
+}
+
+template <typename Type>
+EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
+ EncodedStatistics result;
+ if (page_statistics_) result = page_statistics_->Encode();
+ return result;
+}
+
+template <typename Type>
+EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
+ EncodedStatistics result;
+ if (chunk_statistics_) result = chunk_statistics_->Encode();
+ return result;
+}
+
+template <typename Type>
+void TypedColumnWriter<Type>::ResetPageStatistics() {
+ if (chunk_statistics_ != nullptr) {
+ chunk_statistics_->Merge(*page_statistics_);
+ page_statistics_->Reset();
+ }
+}
+
+// ----------------------------------------------------------------------
+// Dynamic column writer constructor
+
+std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+ const WriterProperties* properties) {
+ const ColumnDescriptor* descr = metadata->descr();
+ Encoding::type encoding = properties->encoding(descr->path());
+ if (properties->dictionary_enabled(descr->path()) &&
+ descr->physical_type() != Type::BOOLEAN) {
+ encoding = properties->dictionary_page_encoding();
+ }
+ switch (descr->physical_type()) {
+ case Type::BOOLEAN:
+ return std::make_shared<BoolWriter>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::INT32:
+ return std::make_shared<Int32Writer>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::INT64:
+ return std::make_shared<Int64Writer>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::INT96:
+ return std::make_shared<Int96Writer>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::FLOAT:
+ return std::make_shared<FloatWriter>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::DOUBLE:
+ return std::make_shared<DoubleWriter>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::BYTE_ARRAY:
+ return std::make_shared<ByteArrayWriter>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_shared<FixedLenByteArrayWriter>(
+ metadata, std::move(pager), expected_rows, encoding, properties);
+ default:
+ ParquetException::NYI("type reader not implemented");
+ }
+ // Unreachable code, but supress compiler warning
+ return std::shared_ptr<ColumnWriter>(nullptr);
+}
+
+// ----------------------------------------------------------------------
+// Instantiate templated classes
+
+template <typename DType>
+inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
+ const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
+ int64_t values_to_write = 0;
+ // If the field is required and non-repeated, there are no definition levels
+ if (descr_->max_definition_level() > 0) {
+ for (int64_t i = 0; i < num_values; ++i) {
+ if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+ }
+
+ WriteDefinitionLevels(num_values, def_levels);
+ } else {
+ // Required field, write all values
+ values_to_write = num_values;
+ }
+
+ // Not present for non-repeated fields
+ if (descr_->max_repetition_level() > 0) {
+ // A row could include more than one value
+ // Count the occasions where we start a new row
+ for (int64_t i = 0; i < num_values; ++i) {
+ if (rep_levels[i] == 0) { num_rows_++; }
+ }
+
+ WriteRepetitionLevels(num_values, rep_levels);
+ } else {
+ // Each value is exactly one row
+ num_rows_ += static_cast<int>(num_values);
+ }
+
+ if (num_rows_ > expected_rows_) {
+ throw ParquetException("More rows were written in the column chunk than expected");
+ }
+
+ // PARQUET-780
+ if (values_to_write > 0) { DCHECK(nullptr != values) << "Values ptr cannot be NULL"; }
+
+ WriteValues(values_to_write, values);
+
+ if (page_statistics_ != nullptr) {
+ page_statistics_->Update(values, values_to_write, num_values - values_to_write);
+ }
+
+ num_buffered_values_ += num_values;
+ num_buffered_encoded_values_ += values_to_write;
+
+ if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+ AddDataPage();
+ }
+ if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+
+ return values_to_write;
+}
+
+template <typename DType>
+inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) {
+ int64_t values_to_write = 0;
+ int64_t spaced_values_to_write = 0;
+ // If the field is required and non-repeated, there are no definition levels
+ if (descr_->max_definition_level() > 0) {
+ // Minimal definition level for which spaced values are written
+ int16_t min_spaced_def_level = descr_->max_definition_level();
+ if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; }
+ for (int64_t i = 0; i < num_values; ++i) {
+ if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+ if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; }
+ }
+
+ WriteDefinitionLevels(num_values, def_levels);
+ } else {
+ // Required field, write all values
+ values_to_write = num_values;
+ spaced_values_to_write = num_values;
+ }
+
+ // Not present for non-repeated fields
+ if (descr_->max_repetition_level() > 0) {
+ // A row could include more than one value
+ // Count the occasions where we start a new row
+ for (int64_t i = 0; i < num_values; ++i) {
+ if (rep_levels[i] == 0) { num_rows_++; }
+ }
+
+ WriteRepetitionLevels(num_values, rep_levels);
+ } else {
+ // Each value is exactly one row
+ num_rows_ += static_cast<int>(num_values);
+ }
+
+ if (num_rows_ > expected_rows_) {
+ throw ParquetException("More rows were written in the column chunk than expected");
+ }
+
+ if (descr_->schema_node()->is_optional()) {
+ WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values);
+ } else {
+ WriteValues(values_to_write, values);
+ }
+ *num_spaced_written = spaced_values_to_write;
+
+ if (page_statistics_ != nullptr) {
+ page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
+ num_values - values_to_write);
+ }
+
+ num_buffered_values_ += num_values;
+ num_buffered_encoded_values_ += values_to_write;
+
+ if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+ AddDataPage();
+ }
+ if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+
+ return values_to_write;
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values) {
+ // We check for DataPage limits only after we have inserted the values. If a user
+ // writes a large number of values, the DataPage size can be much above the limit.
+ // The purpose of this chunking is to bound this. Even if a user writes large number
+ // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+ // pagesize limit
+ int64_t write_batch_size = properties_->write_batch_size();
+ int num_batches = static_cast<int>(num_values / write_batch_size);
+ int64_t num_remaining = num_values % write_batch_size;
+ int64_t value_offset = 0;
+ for (int round = 0; round < num_batches; round++) {
+ int64_t offset = round * write_batch_size;
+ int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
+ &rep_levels[offset], &values[value_offset]);
+ value_offset += num_values;
+ }
+ // Write the remaining values
+ int64_t offset = num_batches * write_batch_size;
+ WriteMiniBatch(
+ num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
+ const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values) {
+ // We check for DataPage limits only after we have inserted the values. If a user
+ // writes a large number of values, the DataPage size can be much above the limit.
+ // The purpose of this chunking is to bound this. Even if a user writes large number
+ // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+ // pagesize limit
+ int64_t write_batch_size = properties_->write_batch_size();
+ int num_batches = static_cast<int>(num_values / write_batch_size);
+ int64_t num_remaining = num_values % write_batch_size;
+ int64_t num_spaced_written = 0;
+ int64_t values_offset = 0;
+ for (int round = 0; round < num_batches; round++) {
+ int64_t offset = round * write_batch_size;
+ WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
+ valid_bits, valid_bits_offset + values_offset, values + values_offset,
+ &num_spaced_written);
+ values_offset += num_spaced_written;
+ }
+ // Write the remaining values
+ int64_t offset = num_batches * write_batch_size;
+ WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
+ valid_bits, valid_bits_offset + values_offset, values + values_offset,
+ &num_spaced_written);
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
+ current_encoder_->Put(values, static_cast<int>(num_values));
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
+ const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
+ current_encoder_->PutSpaced(
+ values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+}
+
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
new file mode 100644
index 0000000..4e113de
--- /dev/null
+++ b/src/parquet/column_writer.h
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_WRITER_H
+#define PARQUET_COLUMN_WRITER_H
+
+#include <vector>
+
+#include "parquet/column_page.h"
+#include "parquet/encoding.h"
+#include "parquet/file/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+class BitWriter;
+class RleEncoder;
+
+class PARQUET_EXPORT LevelEncoder {
+ public:
+ LevelEncoder();
+ ~LevelEncoder();
+
+ static int MaxBufferSize(
+ Encoding::type encoding, int16_t max_level, int num_buffered_values);
+
+ // Initialize the LevelEncoder.
+ void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
+ uint8_t* data, int data_size);
+
+ // Encodes a batch of levels from an array and returns the number of levels encoded
+ int Encode(int batch_size, const int16_t* levels);
+
+ int32_t len() {
+ if (encoding_ != Encoding::RLE) {
+ throw ParquetException("Only implemented for RLE encoding");
+ }
+ return rle_length_;
+ }
+
+ private:
+ int bit_width_;
+ int rle_length_;
+ Encoding::type encoding_;
+ std::unique_ptr<RleEncoder> rle_encoder_;
+ std::unique_ptr<BitWriter> bit_packed_encoder_;
+};
+
+static constexpr int WRITE_BATCH_SIZE = 1000;
+class PARQUET_EXPORT ColumnWriter {
+ public:
+ ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
+ int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
+ const WriterProperties* properties);
+
+ static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
+ std::unique_ptr<PageWriter>, int64_t expected_rows,
+ const WriterProperties* properties);
+
+ Type::type type() const { return descr_->physical_type(); }
+
+ const ColumnDescriptor* descr() const { return descr_; }
+
+ /**
+ * Closes the ColumnWriter, commits any buffered values to pages.
+ *
+ * @return Total size of the column in bytes
+ */
+ int64_t Close();
+
+ protected:
+ virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+
+ // Serializes Dictionary Page if enabled
+ virtual void WriteDictionaryPage() = 0;
+
+ // Checks if the Dictionary Page size limit is reached
+ // If the limit is reached, the Dictionary and Data Pages are serialized
+ // The encoding is switched to PLAIN
+
+ virtual void CheckDictionarySizeLimit() = 0;
+
+ // Plain-encoded statistics of the current page
+ virtual EncodedStatistics GetPageStatistics() = 0;
+
+ // Plain-encoded statistics of the whole chunk
+ virtual EncodedStatistics GetChunkStatistics() = 0;
+
+ // Merges page statistics into chunk statistics, then resets the values
+ virtual void ResetPageStatistics() = 0;
+
+ // Adds Data Pages to an in memory buffer in dictionary encoding mode
+ // Serializes the Data Pages in other encoding modes
+ void AddDataPage();
+
+ // Serializes Data Pages
+ void WriteDataPage(const CompressedDataPage& page);
+
+ // Write multiple definition levels
+ void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
+
+ // Write multiple repetition levels
+ void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
+
+ // RLE encode the src_buffer into dest_buffer and return the encoded size
+ int64_t RleEncodeLevels(
+ const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level);
+
+ // Serialize the buffered Data Pages
+ void FlushBufferedDataPages();
+
+ ColumnChunkMetaDataBuilder* metadata_;
+ const ColumnDescriptor* descr_;
+
+ std::unique_ptr<PageWriter> pager_;
+
+ // The number of rows that should be written in this column chunk.
+ int64_t expected_rows_;
+ bool has_dictionary_;
+ Encoding::type encoding_;
+ const WriterProperties* properties_;
+
+ LevelEncoder level_encoder_;
+
+ ::arrow::MemoryPool* allocator_;
+ ChunkedAllocator pool_;
+
+ // The total number of values stored in the data page. This is the maximum of
+ // the number of encoded definition levels or encoded values. For
+ // non-repeated, required columns, this is equal to the number of encoded
+ // values. For repeated or optional values, there may be fewer data values
+ // than levels, and this tells you how many encoded levels there are in that
+ // case.
+ int64_t num_buffered_values_;
+
+ // The total number of stored values. For repeated or optional values, this
+ // number may be lower than num_buffered_values_.
+ int64_t num_buffered_encoded_values_;
+
+ // Total number of rows written with this ColumnWriter
+ int num_rows_;
+
+ // Records the total number of bytes written by the serializer
+ int64_t total_bytes_written_;
+
+ // Flag to check if the Writer has been closed
+ bool closed_;
+
+ // Flag to infer if dictionary encoding has fallen back to PLAIN
+ bool fallback_;
+
+ std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
+ std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
+
+ std::shared_ptr<ResizableBuffer> definition_levels_rle_;
+ std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
+
+ std::shared_ptr<ResizableBuffer> uncompressed_data_;
+ std::shared_ptr<ResizableBuffer> compressed_data_;
+
+ std::vector<CompressedDataPage> data_pages_;
+
+ private:
+ void InitSinks();
+};
+
+// API to write values to a single column. This is the main client facing API.
+template <typename DType>
+class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
+ public:
+ typedef typename DType::c_type T;
+
+ TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
+ std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
+ const WriterProperties* properties);
+
+ // Write a batch of repetition levels, definition levels, and values to the
+ // column.
+ void WriteBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values);
+
+ /// Write a batch of repetition levels, definition levels, and values to the
+ /// column.
+ ///
+ /// In comparision to WriteBatch the length of repetition and definition levels
+ /// is the same as of the number of values read for max_definition_level == 1.
+ /// In the case of max_definition_level > 1, the repetition and definition
+ /// levels are larger than the values but the values include the null entries
+ /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
+ /// in the parameters of this function if the input has the length of num_values or the
+ /// _number of rows in the lowest nesting level_.
+ ///
+ /// In the case that the most inner node in the Parquet is required, the _number of rows
+ /// in the lowest nesting level_ is equal to the number of non-null values. If the
+ /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
+ /// also includes all values with definition_level == (max_definition_level - 1).
+ ///
+ /// @param num_values number of levels to write.
+ /// @param def_levels The Parquet definiton levels, length is num_values
+ /// @param rep_levels The Parquet repetition levels, length is num_values
+ /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
+ /// level. The length is number of rows in the lowest nesting level.
+ /// @param valid_bits_offset The offset in bits of the valid_bits where the
+ /// first relevant bit resides.
+ /// @param values The values in the lowest nested level including
+ /// spacing for nulls on the lowest levels; input has the length
+ /// of the number of rows on the lowest nesting level.
+ void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ const T* values);
+
+ protected:
+ std::shared_ptr<Buffer> GetValuesBuffer() override {
+ return current_encoder_->FlushValues();
+ }
+ void WriteDictionaryPage() override;
+ void CheckDictionarySizeLimit() override;
+ EncodedStatistics GetPageStatistics() override;
+ EncodedStatistics GetChunkStatistics() override;
+ void ResetPageStatistics() override;
+
+ private:
+ int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const T* values);
+
+ int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
+ const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+ const T* values, int64_t* num_spaced_written);
+
+ typedef Encoder<DType> EncoderType;
+
+ // Write values to a temporary buffer before they are encoded into pages
+ void WriteValues(int64_t num_values, const T* values);
+ void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+ int64_t valid_bits_offset, const T* values);
+ std::unique_ptr<EncoderType> current_encoder_;
+
+ typedef TypedRowGroupStatistics<DType> TypedStats;
+ std::unique_ptr<TypedStats> page_statistics_;
+ std::unique_ptr<TypedStats> chunk_statistics_;
+};
+
+typedef TypedColumnWriter<BooleanType> BoolWriter;
+typedef TypedColumnWriter<Int32Type> Int32Writer;
+typedef TypedColumnWriter<Int64Type> Int64Writer;
+typedef TypedColumnWriter<Int96Type> Int96Writer;
+typedef TypedColumnWriter<FloatType> FloatWriter;
+typedef TypedColumnWriter<DoubleType> DoubleWriter;
+typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
+typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
+
+extern template class PARQUET_EXPORT TypedColumnWriter<BooleanType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<Int32Type>;
+extern template class PARQUET_EXPORT TypedColumnWriter<Int64Type>;
+extern template class PARQUET_EXPORT TypedColumnWriter<Int96Type>;
+extern template class PARQUET_EXPORT TypedColumnWriter<FloatType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<DoubleType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<ByteArrayType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<FLBAType>;
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/encoding-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index 7e78123..61b8e24 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -464,9 +464,8 @@ class DictEncoder : public Encoder<DType> {
// reserve
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
// but not reserving them would cause the encoder to fail.
- return 1 +
- RleEncoder::MaxBufferSize(
- bit_width(), static_cast<int>(buffered_indices_.size())) +
+ return 1 + RleEncoder::MaxBufferSize(
+ bit_width(), static_cast<int>(buffered_indices_.size())) +
RleEncoder::MinBufferSize(bit_width());
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 7c4690e..59d2051 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -26,7 +26,7 @@
#include <string>
#include <vector>
-#include "parquet/column/page.h"
+#include "parquet/column_page.h"
#include "parquet/exception.h"
#include "parquet/file/reader-internal.h"
#include "parquet/parquet_types.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index a4a2016..10ce40c 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -15,9 +15,9 @@
// specific language governing permissions and limitations
// under the License.
-#include "parquet/column/statistics.h"
#include "parquet/file/metadata.h"
#include "parquet/schema.h"
+#include "parquet/statistics.h"
#include "parquet/types.h"
#include <gtest/gtest.h>
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 7a90eeb..5736fa1 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -17,12 +17,12 @@
#include <gtest/gtest.h>
-#include "parquet/column/reader.h"
-#include "parquet/column/test-specialization.h"
-#include "parquet/column/test-util.h"
-#include "parquet/column/writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
#include "parquet/file/reader.h"
#include "parquet/file/writer.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
#include "parquet/types.h"
#include "parquet/util/memory.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 50d2114..2dc50d1 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -24,9 +24,9 @@
#include "arrow/util/key_value_metadata.h"
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
+#include "parquet/properties.h"
#include "parquet/schema.h"
+#include "parquet/statistics.h"
#include "parquet/types.h"
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/printer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/printer.cc b/src/parquet/file/printer.cc
index e398c3a..52b2598 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/file/printer.cc
@@ -20,7 +20,7 @@
#include <string>
#include <vector>
-#include "parquet/column/scanner.h"
+#include "parquet/column_scanner.h"
using std::string;
using std::vector;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 1d9ab47..c39d3eb 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -26,7 +26,7 @@
#include "arrow/util/compression.h"
-#include "parquet/column/page.h"
+#include "parquet/column_page.h"
#include "parquet/exception.h"
#include "parquet/schema.h"
#include "parquet/thrift.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 1ac2384..2667fa8 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -22,11 +22,11 @@
#include <memory>
#include <vector>
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
+#include "parquet/column_page.h"
#include "parquet/file/metadata.h"
#include "parquet/file/reader.h"
#include "parquet/parquet_types.h"
+#include "parquet/properties.h"
#include "parquet/types.h"
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 7bf2c76..d3247cb 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -25,9 +25,9 @@
#include "arrow/io/file.h"
-#include "parquet/column/page.h"
-#include "parquet/column/reader.h"
-#include "parquet/column/scanner.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
#include "parquet/exception.h"
#include "parquet/file/reader-internal.h"
#include "parquet/types.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 7d3c3f9..1cd287c 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -25,11 +25,11 @@
#include <string>
#include <vector>
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
+#include "parquet/column_page.h"
#include "parquet/file/metadata.h"
+#include "parquet/properties.h"
#include "parquet/schema.h"
+#include "parquet/statistics.h"
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index bb24737..1cceb95 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -22,7 +22,7 @@
#include "arrow/util/compression.h"
-#include "parquet/column/writer.h"
+#include "parquet/column_writer.h"
#include "parquet/schema-internal.h"
#include "parquet/schema.h"
#include "parquet/thrift.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 6ac7927..447579a 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -21,7 +21,7 @@
#include <memory>
#include <vector>
-#include "parquet/column/page.h"
+#include "parquet/column_page.h"
#include "parquet/file/metadata.h"
#include "parquet/file/writer.h"
#include "parquet/parquet_types.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 7d48720..b22281a 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -21,8 +21,8 @@
#include <cstdint>
#include <memory>
-#include "parquet/column/properties.h"
#include "parquet/file/metadata.h"
+#include "parquet/properties.h"
#include "parquet/schema.h"
#include "parquet/util/memory.h"
#include "parquet/util/visibility.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
new file mode 100644
index 0000000..0e6d725
--- /dev/null
+++ b/src/parquet/properties-test.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "parquet/file/reader.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+using schema::ColumnPath;
+
+namespace test {
+
+TEST(TestReaderProperties, Basics) {
+ ReaderProperties props;
+
+ ASSERT_EQ(DEFAULT_BUFFER_SIZE, props.buffer_size());
+ ASSERT_EQ(DEFAULT_USE_BUFFERED_STREAM, props.is_buffered_stream_enabled());
+}
+
+TEST(TestWriterProperties, Basics) {
+ std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+ ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
+ ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
+ ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
+}
+
+TEST(TestWriterProperties, AdvancedHandling) {
+ WriterProperties::Builder builder;
+ builder.compression("gzip", Compression::GZIP);
+ builder.compression(Compression::SNAPPY);
+ builder.encoding(Encoding::DELTA_BINARY_PACKED);
+ builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
+ std::shared_ptr<WriterProperties> props = builder.build();
+
+ ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
+ ASSERT_EQ(
+ Compression::SNAPPY, props->compression(ColumnPath::FromDotString("delta-length")));
+ ASSERT_EQ(
+ Encoding::DELTA_BINARY_PACKED, props->encoding(ColumnPath::FromDotString("gzip")));
+ ASSERT_EQ(Encoding::DELTA_LENGTH_BYTE_ARRAY,
+ props->encoding(ColumnPath::FromDotString("delta-length")));
+}
+
+} // namespace test
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/properties.h b/src/parquet/properties.h
new file mode 100644
index 0000000..3ebc3b7
--- /dev/null
+++ b/src/parquet/properties.h
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_PROPERTIES_H
+#define PARQUET_COLUMN_PROPERTIES_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "parquet/exception.h"
+#include "parquet/parquet_version.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+struct ParquetVersion {
+ enum type { PARQUET_1_0, PARQUET_2_0 };
+};
+
+static int64_t DEFAULT_BUFFER_SIZE = 0;
+static bool DEFAULT_USE_BUFFERED_STREAM = false;
+
+class PARQUET_EXPORT ReaderProperties {
+ public:
+ explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+ : pool_(pool) {
+ buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
+ buffer_size_ = DEFAULT_BUFFER_SIZE;
+ }
+
+ ::arrow::MemoryPool* memory_pool() const { return pool_; }
+
+ std::unique_ptr<InputStream> GetStream(
+ RandomAccessSource* source, int64_t start, int64_t num_bytes) {
+ std::unique_ptr<InputStream> stream;
+ if (buffered_stream_enabled_) {
+ stream.reset(
+ new BufferedInputStream(pool_, buffer_size_, source, start, num_bytes));
+ } else {
+ stream.reset(new InMemoryInputStream(source, start, num_bytes));
+ }
+ return stream;
+ }
+
+ bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
+
+ void enable_buffered_stream() { buffered_stream_enabled_ = true; }
+
+ void disable_buffered_stream() { buffered_stream_enabled_ = false; }
+
+ void set_buffer_size(int64_t buf_size) { buffer_size_ = buf_size; }
+
+ int64_t buffer_size() const { return buffer_size_; }
+
+ private:
+ ::arrow::MemoryPool* pool_;
+ int64_t buffer_size_;
+ bool buffered_stream_enabled_;
+};
+
+ReaderProperties PARQUET_EXPORT default_reader_properties();
+
+static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
+static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
+static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
+static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
+static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
+static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
+static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
+ ParquetVersion::PARQUET_1_0;
+static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
+static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
+
+class PARQUET_EXPORT ColumnProperties {
+ public:
+ ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
+ Compression::type codec = DEFAULT_COMPRESSION_TYPE,
+ bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
+ bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED)
+ : encoding(encoding),
+ codec(codec),
+ dictionary_enabled(dictionary_enabled),
+ statistics_enabled(statistics_enabled) {}
+
+ Encoding::type encoding;
+ Compression::type codec;
+ bool dictionary_enabled;
+ bool statistics_enabled;
+};
+
+class PARQUET_EXPORT WriterProperties {
+ public:
+ class Builder {
+ public:
+ Builder()
+ : pool_(::arrow::default_memory_pool()),
+ dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
+ write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
+ pagesize_(DEFAULT_PAGE_SIZE),
+ version_(DEFAULT_WRITER_VERSION),
+ created_by_(DEFAULT_CREATED_BY) {}
+ virtual ~Builder() {}
+
+ Builder* memory_pool(::arrow::MemoryPool* pool) {
+ pool_ = pool;
+ return this;
+ }
+
+ Builder* enable_dictionary() {
+ default_column_properties_.dictionary_enabled = true;
+ return this;
+ }
+
+ Builder* disable_dictionary() {
+ default_column_properties_.dictionary_enabled = false;
+ return this;
+ }
+
+ Builder* enable_dictionary(const std::string& path) {
+ dictionary_enabled_[path] = true;
+ return this;
+ }
+
+ Builder* enable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
+ return this->enable_dictionary(path->ToDotString());
+ }
+
+ Builder* disable_dictionary(const std::string& path) {
+ dictionary_enabled_[path] = false;
+ return this;
+ }
+
+ Builder* disable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
+ return this->disable_dictionary(path->ToDotString());
+ }
+
+ Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
+ dictionary_pagesize_limit_ = dictionary_psize_limit;
+ return this;
+ }
+
+ Builder* write_batch_size(int64_t write_batch_size) {
+ write_batch_size_ = write_batch_size;
+ return this;
+ }
+
+ Builder* data_pagesize(int64_t pg_size) {
+ pagesize_ = pg_size;
+ return this;
+ }
+
+ Builder* version(ParquetVersion::type version) {
+ version_ = version;
+ return this;
+ }
+
+ Builder* created_by(const std::string& created_by) {
+ created_by_ = created_by;
+ return this;
+ }
+
+ /**
+ * Define the encoding that is used when we don't utilise dictionary encoding.
+ *
+ * This either apply if dictionary encoding is disabled or if we fallback
+ * as the dictionary grew too large.
+ */
+ Builder* encoding(Encoding::type encoding_type) {
+ if (encoding_type == Encoding::PLAIN_DICTIONARY ||
+ encoding_type == Encoding::RLE_DICTIONARY) {
+ throw ParquetException("Can't use dictionary encoding as fallback encoding");
+ }
+
+ default_column_properties_.encoding = encoding_type;
+ return this;
+ }
+
+ /**
+ * Define the encoding that is used when we don't utilise dictionary encoding.
+ *
+ * This either apply if dictionary encoding is disabled or if we fallback
+ * as the dictionary grew too large.
+ */
+ Builder* encoding(const std::string& path, Encoding::type encoding_type) {
+ if (encoding_type == Encoding::PLAIN_DICTIONARY ||
+ encoding_type == Encoding::RLE_DICTIONARY) {
+ throw ParquetException("Can't use dictionary encoding as fallback encoding");
+ }
+
+ encodings_[path] = encoding_type;
+ return this;
+ }
+
+ /**
+ * Define the encoding that is used when we don't utilise dictionary encoding.
+ *
+ * This either apply if dictionary encoding is disabled or if we fallback
+ * as the dictionary grew too large.
+ */
+ Builder* encoding(
+ const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
+ return this->encoding(path->ToDotString(), encoding_type);
+ }
+
+ Builder* compression(Compression::type codec) {
+ default_column_properties_.codec = codec;
+ return this;
+ }
+
+ Builder* compression(const std::string& path, Compression::type codec) {
+ codecs_[path] = codec;
+ return this;
+ }
+
+ Builder* compression(
+ const std::shared_ptr<schema::ColumnPath>& path, Compression::type codec) {
+ return this->compression(path->ToDotString(), codec);
+ }
+
+ Builder* enable_statistics() {
+ default_column_properties_.statistics_enabled = true;
+ return this;
+ }
+
+ Builder* disable_statistics() {
+ default_column_properties_.statistics_enabled = false;
+ return this;
+ }
+
+ Builder* enable_statistics(const std::string& path) {
+ statistics_enabled_[path] = true;
+ return this;
+ }
+
+ Builder* enable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
+ return this->enable_statistics(path->ToDotString());
+ }
+
+ Builder* disable_statistics(const std::string& path) {
+ statistics_enabled_[path] = false;
+ return this;
+ }
+
+ Builder* disable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
+ return this->disable_statistics(path->ToDotString());
+ }
+
+ std::shared_ptr<WriterProperties> build() {
+ std::unordered_map<std::string, ColumnProperties> column_properties;
+ auto get = [&](const std::string& key) -> ColumnProperties& {
+ auto it = column_properties.find(key);
+ if (it == column_properties.end())
+ return column_properties[key] = default_column_properties_;
+ else
+ return it->second;
+ };
+
+ for (const auto& item : encodings_)
+ get(item.first).encoding = item.second;
+ for (const auto& item : codecs_)
+ get(item.first).codec = item.second;
+ for (const auto& item : dictionary_enabled_)
+ get(item.first).dictionary_enabled = item.second;
+ for (const auto& item : statistics_enabled_)
+ get(item.first).statistics_enabled = item.second;
+
+ return std::shared_ptr<WriterProperties>(new WriterProperties(pool_,
+ dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_,
+ default_column_properties_, column_properties));
+ }
+
+ private:
+ ::arrow::MemoryPool* pool_;
+ int64_t dictionary_pagesize_limit_;
+ int64_t write_batch_size_;
+ int64_t pagesize_;
+ ParquetVersion::type version_;
+ std::string created_by_;
+
+ // Settings used for each column unless overridden in any of the maps below
+ ColumnProperties default_column_properties_;
+ std::unordered_map<std::string, Encoding::type> encodings_;
+ std::unordered_map<std::string, Compression::type> codecs_;
+ std::unordered_map<std::string, bool> dictionary_enabled_;
+ std::unordered_map<std::string, bool> statistics_enabled_;
+ };
+
+ inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
+
+ inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
+
+ inline int64_t write_batch_size() const { return write_batch_size_; }
+
+ inline int64_t data_pagesize() const { return pagesize_; }
+
+ inline ParquetVersion::type version() const { return parquet_version_; }
+
+ inline std::string created_by() const { return parquet_created_by_; }
+
+ inline Encoding::type dictionary_index_encoding() const {
+ if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
+ return Encoding::PLAIN_DICTIONARY;
+ } else {
+ return Encoding::RLE_DICTIONARY;
+ }
+ }
+
+ inline Encoding::type dictionary_page_encoding() const {
+ if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
+ return Encoding::PLAIN_DICTIONARY;
+ } else {
+ return Encoding::PLAIN;
+ }
+ }
+
+ const ColumnProperties& column_properties(
+ const std::shared_ptr<schema::ColumnPath>& path) const {
+ auto it = column_properties_.find(path->ToDotString());
+ if (it != column_properties_.end()) return it->second;
+ return default_column_properties_;
+ }
+
+ Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
+ return column_properties(path).encoding;
+ }
+
+ Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
+ return column_properties(path).codec;
+ }
+
+ bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
+ return column_properties(path).dictionary_enabled;
+ }
+
+ bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
+ return column_properties(path).statistics_enabled;
+ }
+
+ private:
+ explicit WriterProperties(::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
+ int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version,
+ const std::string& created_by, const ColumnProperties& default_column_properties,
+ const std::unordered_map<std::string, ColumnProperties>& column_properties)
+ : pool_(pool),
+ dictionary_pagesize_limit_(dictionary_pagesize_limit),
+ write_batch_size_(write_batch_size),
+ pagesize_(pagesize),
+ parquet_version_(version),
+ parquet_created_by_(created_by),
+ default_column_properties_(default_column_properties),
+ column_properties_(column_properties) {}
+
+ ::arrow::MemoryPool* pool_;
+ int64_t dictionary_pagesize_limit_;
+ int64_t write_batch_size_;
+ int64_t pagesize_;
+ ParquetVersion::type parquet_version_;
+ std::string parquet_created_by_;
+ ColumnProperties default_column_properties_;
+ std::unordered_map<std::string, ColumnProperties> column_properties_;
+};
+
+std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_PROPERTIES_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index fcce38b..cb40abb 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -25,8 +25,8 @@
#include "arrow/io/file.h"
-#include "parquet/column/reader.h"
-#include "parquet/column/scanner.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
#include "parquet/file/printer.h"
#include "parquet/file/reader-internal.h"
#include "parquet/file/reader.h"