You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:25 UTC

[2/6] parquet-cpp git commit: PARQUET-858: Flatten column directory, minor code consolidation

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
new file mode 100644
index 0000000..798c7ba
--- /dev/null
+++ b/src/parquet/column_writer-test.cc
@@ -0,0 +1,729 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/file/reader-internal.h"
+#include "parquet/file/writer-internal.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
+#include "parquet/types.h"
+#include "parquet/util/comparison.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+
+using schema::NodePtr;
+using schema::PrimitiveNode;
+
+namespace test {
+
+// The default size used in most tests.
+const int SMALL_SIZE = 100;
+// Larger size to test some corner cases, only used in some specific cases.
+const int LARGE_SIZE = 100000;
+// Very large size to test dictionary fallback.
+const int VERY_LARGE_SIZE = 400000;
+
+template <typename TestType>
+class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
+ public:
+  typedef typename TestType::c_type T;
+
+  void SetUp() {
+    this->SetupValuesOut(SMALL_SIZE);
+    writer_properties_ = default_writer_properties();
+    definition_levels_out_.resize(SMALL_SIZE);
+    repetition_levels_out_.resize(SMALL_SIZE);
+
+    this->SetUpSchema(Repetition::REQUIRED);
+
+    descr_ = this->schema_.Column(0);
+  }
+
+  Type::type type_num() { return TestType::type_num; }
+
+  void BuildReader(
+      int64_t num_rows, Compression::type compression = Compression::UNCOMPRESSED) {
+    auto buffer = sink_->GetBuffer();
+    std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
+    std::unique_ptr<SerializedPageReader> page_reader(
+        new SerializedPageReader(std::move(source), num_rows, compression));
+    reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
+  }
+
+  std::shared_ptr<TypedColumnWriter<TestType>> BuildWriter(
+      int64_t output_size = SMALL_SIZE,
+      const ColumnProperties& column_properties = ColumnProperties()) {
+    sink_.reset(new InMemoryOutputStream());
+    metadata_ = ColumnChunkMetaDataBuilder::Make(
+        writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
+    std::unique_ptr<SerializedPageWriter> pager(
+        new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get()));
+    WriterProperties::Builder wp_builder;
+    if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
+        column_properties.encoding == Encoding::RLE_DICTIONARY) {
+      wp_builder.enable_dictionary();
+    } else {
+      wp_builder.disable_dictionary();
+      wp_builder.encoding(column_properties.encoding);
+    }
+    writer_properties_ = wp_builder.build();
+    std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
+        metadata_.get(), std::move(pager), output_size, writer_properties_.get());
+    return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
+  }
+
+  void ReadColumn(Compression::type compression = Compression::UNCOMPRESSED) {
+    BuildReader(static_cast<int64_t>(this->values_out_.size()), compression);
+    reader_->ReadBatch(static_cast<int>(this->values_out_.size()),
+        definition_levels_out_.data(), repetition_levels_out_.data(),
+        this->values_out_ptr_, &values_read_);
+    this->SyncValuesOut();
+  }
+
+  void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
+
+  void TestRequiredWithEncoding(Encoding::type encoding) {
+    return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
+  }
+
+  void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
+      bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
+    this->GenerateData(num_rows);
+
+    this->WriteRequiredWithSettings(
+        encoding, compression, enable_dictionary, enable_statistics, num_rows);
+    this->ReadAndCompare(compression, num_rows);
+
+    this->WriteRequiredWithSettingsSpaced(
+        encoding, compression, enable_dictionary, enable_statistics, num_rows);
+    this->ReadAndCompare(compression, num_rows);
+  }
+
+  void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
+      bool enable_dictionary, bool enable_statistics, int64_t num_rows) {
+    ColumnProperties column_properties(
+        encoding, compression, enable_dictionary, enable_statistics);
+    std::shared_ptr<TypedColumnWriter<TestType>> writer =
+        this->BuildWriter(num_rows, column_properties);
+    writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+    // The behaviour should be independent from the number of Close() calls
+    writer->Close();
+    writer->Close();
+  }
+
+  void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
+      Compression::type compression, bool enable_dictionary, bool enable_statistics,
+      int64_t num_rows) {
+    std::vector<uint8_t> valid_bits(
+        BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
+    ColumnProperties column_properties(
+        encoding, compression, enable_dictionary, enable_statistics);
+    std::shared_ptr<TypedColumnWriter<TestType>> writer =
+        this->BuildWriter(num_rows, column_properties);
+    writer->WriteBatchSpaced(
+        this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, this->values_ptr_);
+    // The behaviour should be independent from the number of Close() calls
+    writer->Close();
+    writer->Close();
+  }
+
+  void ReadAndCompare(Compression::type compression, int64_t num_rows) {
+    this->SetupValuesOut(num_rows);
+    this->ReadColumnFully(compression);
+    Compare<T> compare(this->descr_);
+    for (size_t i = 0; i < this->values_.size(); i++) {
+      if (compare(this->values_[i], this->values_out_[i]) ||
+          compare(this->values_out_[i], this->values_[i])) {
+        std::cout << "Failed at " << i << std::endl;
+      }
+      ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
+      ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
+    }
+    ASSERT_EQ(this->values_, this->values_out_);
+  }
+
+  int64_t metadata_num_values() {
+    // Metadata accessor must be created lazily.
+    // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+    // complete (no changes to the metadata buffer can be made after instantiation)
+    auto metadata_accessor = ColumnChunkMetaData::Make(
+        reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
+    return metadata_accessor->num_values();
+  }
+
+  std::vector<Encoding::type> metadata_encodings() {
+    // Metadata accessor must be created lazily.
+    // This is because the ColumnChunkMetaData semantics dictate the metadata object is
+    // complete (no changes to the metadata buffer can be made after instantiation)
+    auto metadata_accessor = ColumnChunkMetaData::Make(
+        reinterpret_cast<const uint8_t*>(&thrift_metadata_), this->descr_);
+    return metadata_accessor->encodings();
+  }
+
+ protected:
+  int64_t values_read_;
+  // Keep the reader alive as for ByteArray the lifetime of the ByteArray
+  // content is bound to the reader.
+  std::unique_ptr<TypedColumnReader<TestType>> reader_;
+
+  std::vector<int16_t> definition_levels_out_;
+  std::vector<int16_t> repetition_levels_out_;
+
+  const ColumnDescriptor* descr_;
+
+ private:
+  format::ColumnChunk thrift_metadata_;
+  std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
+  std::unique_ptr<InMemoryOutputStream> sink_;
+  std::shared_ptr<WriterProperties> writer_properties_;
+  std::vector<std::vector<uint8_t>> data_buffer_;
+};
+
+template <typename TestType>
+void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression) {
+  int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+  BuildReader(total_values, compression);
+  values_read_ = 0;
+  while (values_read_ < total_values) {
+    int64_t values_read_recently = 0;
+    reader_->ReadBatch(
+        static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
+        definition_levels_out_.data() + values_read_,
+        repetition_levels_out_.data() + values_read_,
+        this->values_out_ptr_ + values_read_, &values_read_recently);
+    values_read_ += values_read_recently;
+  }
+  this->SyncValuesOut();
+}
+
+template <>
+void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression) {
+  int64_t total_values = static_cast<int64_t>(this->values_out_.size());
+  BuildReader(total_values, compression);
+  this->data_buffer_.clear();
+
+  values_read_ = 0;
+  while (values_read_ < total_values) {
+    int64_t values_read_recently = 0;
+    reader_->ReadBatch(
+        static_cast<int>(this->values_out_.size()) - static_cast<int>(values_read_),
+        definition_levels_out_.data() + values_read_,
+        repetition_levels_out_.data() + values_read_,
+        this->values_out_ptr_ + values_read_, &values_read_recently);
+
+    // Copy contents of the pointers
+    std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
+    uint8_t* data_ptr = data.data();
+    for (int64_t i = 0; i < values_read_recently; i++) {
+      memcpy(data_ptr + this->descr_->type_length() * i,
+          this->values_out_[i + values_read_].ptr, this->descr_->type_length());
+      this->values_out_[i + values_read_].ptr =
+          data_ptr + this->descr_->type_length() * i;
+    }
+    data_buffer_.emplace_back(std::move(data));
+
+    values_read_ += values_read_recently;
+  }
+  this->SyncValuesOut();
+}
+
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+    BooleanType, ByteArrayType, FLBAType>
+    TestTypes;
+
+TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
+
+using TestNullValuesWriter = TestPrimitiveWriter<Int32Type>;
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
+  this->TestRequiredWithEncoding(Encoding::PLAIN);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
+  this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
+}
+
+/*
+TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
+  this->TestRequiredWithEncoding(Encoding::RLE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredBitPacked) {
+  this->TestRequiredWithEncoding(Encoding::BIT_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaBinaryPacked) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_BINARY_PACKED);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaLengthByteArray) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_LENGTH_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredDeltaByteArray) {
+  this->TestRequiredWithEncoding(Encoding::DELTA_BYTE_ARRAY);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
+  this->TestRequiredWithEncoding(Encoding::RLE_DICTIONARY);
+}
+*/
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithBrotliCompression) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::BROTLI, false, false, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndBrotliCompression) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::BROTLI, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE);
+}
+
+TYPED_TEST(TestPrimitiveWriter, Optional) {
+  // Optional and non-repeated, with definition levels
+  // but no repetition levels
+  this->SetUpSchema(Repetition::OPTIONAL);
+
+  this->GenerateData(SMALL_SIZE);
+  std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+  definition_levels[1] = 0;
+
+  auto writer = this->BuildWriter();
+  writer->WriteBatch(
+      this->values_.size(), definition_levels.data(), nullptr, this->values_ptr_);
+  writer->Close();
+
+  // PARQUET-703
+  ASSERT_EQ(100, this->metadata_num_values());
+
+  this->ReadColumn();
+  ASSERT_EQ(99, this->values_read_);
+  this->values_out_.resize(99);
+  this->values_.resize(99);
+  ASSERT_EQ(this->values_, this->values_out_);
+}
+
+TYPED_TEST(TestPrimitiveWriter, OptionalSpaced) {
+  // Optional and non-repeated, with definition levels
+  // but no repetition levels
+  this->SetUpSchema(Repetition::OPTIONAL);
+
+  this->GenerateData(SMALL_SIZE);
+  std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+  std::vector<uint8_t> valid_bits(::arrow::BitUtil::BytesForBits(SMALL_SIZE), 255);
+
+  definition_levels[SMALL_SIZE - 1] = 0;
+  ::arrow::BitUtil::ClearBit(valid_bits.data(), SMALL_SIZE - 1);
+  definition_levels[1] = 0;
+  ::arrow::BitUtil::ClearBit(valid_bits.data(), 1);
+
+  auto writer = this->BuildWriter();
+  writer->WriteBatchSpaced(this->values_.size(), definition_levels.data(), nullptr,
+      valid_bits.data(), 0, this->values_ptr_);
+  writer->Close();
+
+  // PARQUET-703
+  ASSERT_EQ(100, this->metadata_num_values());
+
+  this->ReadColumn();
+  ASSERT_EQ(98, this->values_read_);
+  this->values_out_.resize(98);
+  this->values_.resize(99);
+  this->values_.erase(this->values_.begin() + 1);
+  ASSERT_EQ(this->values_, this->values_out_);
+}
+
+TYPED_TEST(TestPrimitiveWriter, Repeated) {
+  // Optional and repeated, so definition and repetition levels
+  this->SetUpSchema(Repetition::REPEATED);
+
+  this->GenerateData(SMALL_SIZE);
+  std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+  definition_levels[1] = 0;
+  std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
+
+  auto writer = this->BuildWriter();
+  writer->WriteBatch(this->values_.size(), definition_levels.data(),
+      repetition_levels.data(), this->values_ptr_);
+  writer->Close();
+
+  this->ReadColumn();
+  ASSERT_EQ(SMALL_SIZE - 1, this->values_read_);
+  this->values_out_.resize(SMALL_SIZE - 1);
+  this->values_.resize(SMALL_SIZE - 1);
+  ASSERT_EQ(this->values_, this->values_out_);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredTooFewRows) {
+  this->GenerateData(SMALL_SIZE - 1);
+
+  auto writer = this->BuildWriter();
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+  ASSERT_THROW(writer->Close(), ParquetException);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredTooMany) {
+  this->GenerateData(2 * SMALL_SIZE);
+
+  auto writer = this->BuildWriter();
+  ASSERT_THROW(
+      writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_),
+      ParquetException);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
+  // Optional and repeated, so definition and repetition levels
+  this->SetUpSchema(Repetition::REPEATED);
+
+  this->GenerateData(SMALL_SIZE);
+  std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
+  definition_levels[1] = 0;
+  std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
+  repetition_levels[3] = 1;
+
+  auto writer = this->BuildWriter();
+  writer->WriteBatch(this->values_.size(), definition_levels.data(),
+      repetition_levels.data(), this->values_ptr_);
+  ASSERT_THROW(writer->Close(), ParquetException);
+}
+
+TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
+  this->GenerateData(LARGE_SIZE);
+
+  // Test case 1: required and non-repeated, so no definition or repetition levels
+  auto writer = this->BuildWriter(LARGE_SIZE);
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+  writer->Close();
+
+  // Just read the first SMALL_SIZE rows to ensure we could read it back in
+  this->ReadColumn();
+  ASSERT_EQ(SMALL_SIZE, this->values_read_);
+  this->values_.resize(SMALL_SIZE);
+  ASSERT_EQ(this->values_, this->values_out_);
+}
+
+// Test case for dictionary fallback encoding
+TYPED_TEST(TestPrimitiveWriter, RequiredVeryLargeChunk) {
+  this->GenerateData(VERY_LARGE_SIZE);
+
+  auto writer = this->BuildWriter(VERY_LARGE_SIZE, Encoding::PLAIN_DICTIONARY);
+  writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
+  writer->Close();
+
+  // Read all rows so we are sure that also the non-dictionary pages are read correctly
+  this->SetupValuesOut(VERY_LARGE_SIZE);
+  this->ReadColumnFully();
+  ASSERT_EQ(VERY_LARGE_SIZE, this->values_read_);
+  this->values_.resize(VERY_LARGE_SIZE);
+  ASSERT_EQ(this->values_, this->values_out_);
+  std::vector<Encoding::type> encodings = this->metadata_encodings();
+  // There are 3 encodings (RLE, PLAIN_DICTIONARY, PLAIN) in a fallback case
+  // Dictionary encoding is not allowed for boolean type
+  // There are 2 encodings (RLE, PLAIN) in a non dictionary encoding case
+  if (this->type_num() != Type::BOOLEAN) {
+    ASSERT_EQ(Encoding::PLAIN_DICTIONARY, encodings[0]);
+    ASSERT_EQ(Encoding::PLAIN, encodings[1]);
+    ASSERT_EQ(Encoding::RLE, encodings[2]);
+  } else {
+    ASSERT_EQ(Encoding::PLAIN, encodings[0]);
+    ASSERT_EQ(Encoding::RLE, encodings[1]);
+  }
+}
+
+// PARQUET-719
+// Test case for NULL values
+TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
+  this->SetUpSchema(Repetition::OPTIONAL);
+
+  this->GenerateData(LARGE_SIZE);
+
+  std::vector<int16_t> definition_levels(LARGE_SIZE, 0);
+  std::vector<int16_t> repetition_levels(LARGE_SIZE, 0);
+
+  auto writer = this->BuildWriter(LARGE_SIZE);
+  // All values being written are NULL
+  writer->WriteBatch(
+      this->values_.size(), definition_levels.data(), repetition_levels.data(), NULL);
+  writer->Close();
+
+  // Just read the first SMALL_SIZE rows to ensure we could read it back in
+  this->ReadColumn();
+  ASSERT_EQ(0, this->values_read_);
+}
+
+// PARQUET-764
+// Correct bitpacking for boolean write at non-byte boundaries
+using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
+TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
+  this->SetUpSchema(Repetition::REQUIRED);
+  auto writer = this->BuildWriter();
+  for (int i = 0; i < SMALL_SIZE; i++) {
+    bool value = (i % 2 == 0) ? true : false;
+    writer->WriteBatch(1, nullptr, nullptr, &value);
+  }
+  writer->Close();
+  this->ReadColumn();
+  for (int i = 0; i < SMALL_SIZE; i++) {
+    ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
+  }
+}
+
+void GenerateLevels(int min_repeat_factor, int max_repeat_factor, int max_level,
+    std::vector<int16_t>& input_levels) {
+  // for each repetition count upto max_repeat_factor
+  for (int repeat = min_repeat_factor; repeat <= max_repeat_factor; repeat++) {
+    // repeat count increases by a factor of 2 for every iteration
+    int repeat_count = (1 << repeat);
+    // generate levels for repetition count upto the maximum level
+    int value = 0;
+    int bwidth = 0;
+    while (value <= max_level) {
+      for (int i = 0; i < repeat_count; i++) {
+        input_levels.push_back(value);
+      }
+      value = (2 << bwidth) - 1;
+      bwidth++;
+    }
+  }
+}
+
+void EncodeLevels(Encoding::type encoding, int max_level, int num_levels,
+    const int16_t* input_levels, std::vector<uint8_t>& bytes) {
+  LevelEncoder encoder;
+  int levels_count = 0;
+  bytes.resize(2 * num_levels);
+  ASSERT_EQ(2 * num_levels, static_cast<int>(bytes.size()));
+  // encode levels
+  if (encoding == Encoding::RLE) {
+    // leave space to write the rle length value
+    encoder.Init(encoding, max_level, num_levels, bytes.data() + sizeof(int32_t),
+        static_cast<int>(bytes.size()));
+
+    levels_count = encoder.Encode(num_levels, input_levels);
+    (reinterpret_cast<int32_t*>(bytes.data()))[0] = encoder.len();
+  } else {
+    encoder.Init(
+        encoding, max_level, num_levels, bytes.data(), static_cast<int>(bytes.size()));
+    levels_count = encoder.Encode(num_levels, input_levels);
+  }
+  ASSERT_EQ(num_levels, levels_count);
+}
+
+void VerifyDecodingLevels(Encoding::type encoding, int max_level,
+    std::vector<int16_t>& input_levels, std::vector<uint8_t>& bytes) {
+  LevelDecoder decoder;
+  int levels_count = 0;
+  std::vector<int16_t> output_levels;
+  int num_levels = static_cast<int>(input_levels.size());
+
+  output_levels.resize(num_levels);
+  ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
+
+  // Decode levels and test with multiple decode calls
+  decoder.SetData(encoding, max_level, num_levels, bytes.data());
+  int decode_count = 4;
+  int num_inner_levels = num_levels / decode_count;
+  // Try multiple decoding on a single SetData call
+  for (int ct = 0; ct < decode_count; ct++) {
+    int offset = ct * num_inner_levels;
+    levels_count = decoder.Decode(num_inner_levels, output_levels.data());
+    ASSERT_EQ(num_inner_levels, levels_count);
+    for (int i = 0; i < num_inner_levels; i++) {
+      EXPECT_EQ(input_levels[i + offset], output_levels[i]);
+    }
+  }
+  // check the remaining levels
+  int num_levels_completed = decode_count * (num_levels / decode_count);
+  int num_remaining_levels = num_levels - num_levels_completed;
+  if (num_remaining_levels > 0) {
+    levels_count = decoder.Decode(num_remaining_levels, output_levels.data());
+    ASSERT_EQ(num_remaining_levels, levels_count);
+    for (int i = 0; i < num_remaining_levels; i++) {
+      EXPECT_EQ(input_levels[i + num_levels_completed], output_levels[i]);
+    }
+  }
+  // Test zero Decode values
+  ASSERT_EQ(0, decoder.Decode(1, output_levels.data()));
+}
+
+void VerifyDecodingMultipleSetData(Encoding::type encoding, int max_level,
+    std::vector<int16_t>& input_levels, std::vector<std::vector<uint8_t>>& bytes) {
+  LevelDecoder decoder;
+  int levels_count = 0;
+  std::vector<int16_t> output_levels;
+
+  // Decode levels and test with multiple SetData calls
+  int setdata_count = static_cast<int>(bytes.size());
+  int num_levels = static_cast<int>(input_levels.size()) / setdata_count;
+  output_levels.resize(num_levels);
+  // Try multiple SetData
+  for (int ct = 0; ct < setdata_count; ct++) {
+    int offset = ct * num_levels;
+    ASSERT_EQ(num_levels, static_cast<int>(output_levels.size()));
+    decoder.SetData(encoding, max_level, num_levels, bytes[ct].data());
+    levels_count = decoder.Decode(num_levels, output_levels.data());
+    ASSERT_EQ(num_levels, levels_count);
+    for (int i = 0; i < num_levels; i++) {
+      EXPECT_EQ(input_levels[i + offset], output_levels[i]);
+    }
+  }
+}
+
+// Test levels with maximum bit-width from 1 to 8
+// increase the repetition count for each iteration by a factor of 2
+TEST(TestLevels, TestLevelsDecodeMultipleBitWidth) {
+  int min_repeat_factor = 0;
+  int max_repeat_factor = 7;  // 128
+  int max_bit_width = 8;
+  std::vector<int16_t> input_levels;
+  std::vector<uint8_t> bytes;
+  Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
+
+  // for each encoding
+  for (int encode = 0; encode < 2; encode++) {
+    Encoding::type encoding = encodings[encode];
+    // BIT_PACKED requires a sequence of atleast 8
+    if (encoding == Encoding::BIT_PACKED) min_repeat_factor = 3;
+    // for each maximum bit-width
+    for (int bit_width = 1; bit_width <= max_bit_width; bit_width++) {
+      // find the maximum level for the current bit_width
+      int max_level = (1 << bit_width) - 1;
+      // Generate levels
+      GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
+      EncodeLevels(encoding, max_level, static_cast<int>(input_levels.size()),
+          input_levels.data(), bytes);
+      VerifyDecodingLevels(encoding, max_level, input_levels, bytes);
+      input_levels.clear();
+    }
+  }
+}
+
+// Test multiple decoder SetData calls
+TEST(TestLevels, TestLevelsDecodeMultipleSetData) {
+  int min_repeat_factor = 3;
+  int max_repeat_factor = 7;  // 128
+  int bit_width = 8;
+  int max_level = (1 << bit_width) - 1;
+  std::vector<int16_t> input_levels;
+  std::vector<std::vector<uint8_t>> bytes;
+  Encoding::type encodings[2] = {Encoding::RLE, Encoding::BIT_PACKED};
+  GenerateLevels(min_repeat_factor, max_repeat_factor, max_level, input_levels);
+  int num_levels = static_cast<int>(input_levels.size());
+  int setdata_factor = 8;
+  int split_level_size = num_levels / setdata_factor;
+  bytes.resize(setdata_factor);
+
+  // for each encoding
+  for (int encode = 0; encode < 2; encode++) {
+    Encoding::type encoding = encodings[encode];
+    for (int rf = 0; rf < setdata_factor; rf++) {
+      int offset = rf * split_level_size;
+      EncodeLevels(encoding, max_level, split_level_size,
+          reinterpret_cast<int16_t*>(input_levels.data()) + offset, bytes[rf]);
+    }
+    VerifyDecodingMultipleSetData(encoding, max_level, input_levels, bytes);
+  }
+}
+
+TEST(TestLevelEncoder, MinimumBufferSize) {
+  // PARQUET-676, PARQUET-698
+  const int kNumToEncode = 1024;
+
+  std::vector<int16_t> levels;
+  for (int i = 0; i < kNumToEncode; ++i) {
+    if (i % 9 == 0) {
+      levels.push_back(0);
+    } else {
+      levels.push_back(1);
+    }
+  }
+
+  std::vector<uint8_t> output(
+      LevelEncoder::MaxBufferSize(Encoding::RLE, 1, kNumToEncode));
+
+  LevelEncoder encoder;
+  encoder.Init(
+      Encoding::RLE, 1, kNumToEncode, output.data(), static_cast<int>(output.size()));
+  int encode_count = encoder.Encode(kNumToEncode, levels.data());
+
+  ASSERT_EQ(kNumToEncode, encode_count);
+}
+
+TEST(TestLevelEncoder, MinimumBufferSize2) {
+  // PARQUET-708
+  // Test the worst case for bit_width=2 consisting of
+  // LiteralRun(size=8)
+  // RepeatedRun(size=8)
+  // LiteralRun(size=8)
+  // ...
+  const int kNumToEncode = 1024;
+
+  std::vector<int16_t> levels;
+  for (int i = 0; i < kNumToEncode; ++i) {
+    // This forces a literal run of 00000001
+    // followed by eight 1s
+    if ((i % 16) < 7) {
+      levels.push_back(0);
+    } else {
+      levels.push_back(1);
+    }
+  }
+
+  for (int bit_width = 1; bit_width <= 8; bit_width++) {
+    std::vector<uint8_t> output(
+        LevelEncoder::MaxBufferSize(Encoding::RLE, bit_width, kNumToEncode));
+
+    LevelEncoder encoder;
+    encoder.Init(Encoding::RLE, bit_width, kNumToEncode, output.data(),
+        static_cast<int>(output.size()));
+    int encode_count = encoder.Encode(kNumToEncode, levels.data());
+
+    ASSERT_EQ(kNumToEncode, encode_count);
+  }
+}
+
+}  // namespace test
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
new file mode 100644
index 0000000..c13d4a0
--- /dev/null
+++ b/src/parquet/column_writer.cc
@@ -0,0 +1,597 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column_writer.h"
+
+#include "parquet/encoding-internal.h"
+#include "parquet/properties.h"
+#include "parquet/statistics.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace parquet {
+
+LevelEncoder::LevelEncoder() {}
+LevelEncoder::~LevelEncoder() {}
+
+void LevelEncoder::Init(Encoding::type encoding, int16_t max_level,
+    int num_buffered_values, uint8_t* data, int data_size) {
+  bit_width_ = BitUtil::Log2(max_level + 1);
+  encoding_ = encoding;
+  switch (encoding) {
+    case Encoding::RLE: {
+      rle_encoder_.reset(new RleEncoder(data, data_size, bit_width_));
+      break;
+    }
+    case Encoding::BIT_PACKED: {
+      int num_bytes =
+          static_cast<int>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
+      bit_packed_encoder_.reset(new BitWriter(data, num_bytes));
+      break;
+    }
+    default:
+      throw ParquetException("Unknown encoding type for levels.");
+  }
+}
+
+int LevelEncoder::MaxBufferSize(
+    Encoding::type encoding, int16_t max_level, int num_buffered_values) {
+  int bit_width = BitUtil::Log2(max_level + 1);
+  int num_bytes = 0;
+  switch (encoding) {
+    case Encoding::RLE: {
+      // TODO: Due to the way we currently check if the buffer is full enough,
+      // we need to have MinBufferSize as head room.
+      num_bytes = RleEncoder::MaxBufferSize(bit_width, num_buffered_values) +
+                  RleEncoder::MinBufferSize(bit_width);
+      break;
+    }
+    case Encoding::BIT_PACKED: {
+      num_bytes = static_cast<int>(BitUtil::Ceil(num_buffered_values * bit_width, 8));
+      break;
+    }
+    default:
+      throw ParquetException("Unknown encoding type for levels.");
+  }
+  return num_bytes;
+}
+
+int LevelEncoder::Encode(int batch_size, const int16_t* levels) {
+  int num_encoded = 0;
+  if (!rle_encoder_ && !bit_packed_encoder_) {
+    throw ParquetException("Level encoders are not initialized.");
+  }
+
+  if (encoding_ == Encoding::RLE) {
+    for (int i = 0; i < batch_size; ++i) {
+      if (!rle_encoder_->Put(*(levels + i))) { break; }
+      ++num_encoded;
+    }
+    rle_encoder_->Flush();
+    rle_length_ = rle_encoder_->len();
+  } else {
+    for (int i = 0; i < batch_size; ++i) {
+      if (!bit_packed_encoder_->PutValue(*(levels + i), bit_width_)) { break; }
+      ++num_encoded;
+    }
+    bit_packed_encoder_->Flush();
+  }
+  return num_encoded;
+}
+
+// ----------------------------------------------------------------------
+// ColumnWriter
+
+std::shared_ptr<WriterProperties> default_writer_properties() {
+  static std::shared_ptr<WriterProperties> default_writer_properties =
+      WriterProperties::Builder().build();
+  return default_writer_properties;
+}
+
+ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
+    std::unique_ptr<PageWriter> pager, int64_t expected_rows, bool has_dictionary,
+    Encoding::type encoding, const WriterProperties* properties)
+    : metadata_(metadata),
+      descr_(metadata->descr()),
+      pager_(std::move(pager)),
+      expected_rows_(expected_rows),
+      has_dictionary_(has_dictionary),
+      encoding_(encoding),
+      properties_(properties),
+      allocator_(properties->memory_pool()),
+      pool_(properties->memory_pool()),
+      num_buffered_values_(0),
+      num_buffered_encoded_values_(0),
+      num_rows_(0),
+      total_bytes_written_(0),
+      closed_(false),
+      fallback_(false) {
+  definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+  repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+  definition_levels_rle_ =
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  repetition_levels_rle_ =
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  uncompressed_data_ =
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  if (pager_->has_compressor()) {
+    compressed_data_ =
+        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  }
+}
+
+void ColumnWriter::InitSinks() {
+  definition_levels_sink_->Clear();
+  repetition_levels_sink_->Clear();
+}
+
+void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
+  DCHECK(!closed_);
+  definition_levels_sink_->Write(
+      reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
+}
+
+void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
+  DCHECK(!closed_);
+  repetition_levels_sink_->Write(
+      reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
+}
+
+// return the size of the encoded buffer
+int64_t ColumnWriter::RleEncodeLevels(
+    const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) {
+  // TODO: This only works with due to some RLE specifics
+  int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
+                         static_cast<int>(num_buffered_values_)) +
+                     sizeof(int32_t);
+
+  // Use Arrow::Buffer::shrink_to_fit = false
+  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+  PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
+
+  level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
+      dest_buffer->mutable_data() + sizeof(int32_t),
+      static_cast<int>(dest_buffer->size()) - sizeof(int32_t));
+  int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
+      reinterpret_cast<const int16_t*>(src_buffer.data()));
+  DCHECK_EQ(encoded, num_buffered_values_);
+  reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
+  int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
+  return encoded_size;
+}
+
+void ColumnWriter::AddDataPage() {
+  int64_t definition_levels_rle_size = 0;
+  int64_t repetition_levels_rle_size = 0;
+
+  std::shared_ptr<Buffer> values = GetValuesBuffer();
+
+  if (descr_->max_definition_level() > 0) {
+    definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
+        definition_levels_rle_.get(), descr_->max_definition_level());
+  }
+
+  if (descr_->max_repetition_level() > 0) {
+    repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
+        repetition_levels_rle_.get(), descr_->max_repetition_level());
+  }
+
+  int64_t uncompressed_size =
+      definition_levels_rle_size + repetition_levels_rle_size + values->size();
+
+  // Use Arrow::Buffer::shrink_to_fit = false
+  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+  PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
+
+  // Concatenate data into a single buffer
+  uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
+  memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
+  uncompressed_ptr += repetition_levels_rle_size;
+  memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
+  uncompressed_ptr += definition_levels_rle_size;
+  memcpy(uncompressed_ptr, values->data(), values->size());
+
+  EncodedStatistics page_stats = GetPageStatistics();
+  ResetPageStatistics();
+
+  std::shared_ptr<Buffer> compressed_data;
+  if (pager_->has_compressor()) {
+    pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
+    compressed_data = compressed_data_;
+  } else {
+    compressed_data = uncompressed_data_;
+  }
+
+  // Write the page to OutputStream eagerly if there is no dictionary or
+  // if dictionary encoding has fallen back to PLAIN
+  if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary encoding
+    std::shared_ptr<Buffer> compressed_data_copy;
+    PARQUET_THROW_NOT_OK(compressed_data->Copy(
+        0, compressed_data->size(), allocator_, &compressed_data_copy));
+    CompressedDataPage page(compressed_data_copy,
+        static_cast<int32_t>(num_buffered_values_), encoding_, Encoding::RLE,
+        Encoding::RLE, uncompressed_size, page_stats);
+    data_pages_.push_back(std::move(page));
+  } else {  // Eagerly write pages
+    CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
+        encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+    WriteDataPage(page);
+  }
+
+  // Re-initialize the sinks for next Page.
+  InitSinks();
+  num_buffered_values_ = 0;
+  num_buffered_encoded_values_ = 0;
+}
+
+void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
+  total_bytes_written_ += pager_->WriteDataPage(page);
+}
+
+int64_t ColumnWriter::Close() {
+  if (!closed_) {
+    closed_ = true;
+    if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
+
+    FlushBufferedDataPages();
+
+    EncodedStatistics chunk_statistics = GetChunkStatistics();
+    if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
+    pager_->Close(has_dictionary_, fallback_);
+  }
+
+  if (num_rows_ != expected_rows_) {
+    std::stringstream ss;
+    ss << "Written rows: " << num_rows_ << " != expected rows: " << expected_rows_
+       << "in the current column chunk";
+    throw ParquetException(ss.str());
+  }
+
+  return total_bytes_written_;
+}
+
+void ColumnWriter::FlushBufferedDataPages() {
+  // Write all outstanding data to a new page
+  if (num_buffered_values_ > 0) { AddDataPage(); }
+  for (size_t i = 0; i < data_pages_.size(); i++) {
+    WriteDataPage(data_pages_[i]);
+  }
+  data_pages_.clear();
+}
+
+// ----------------------------------------------------------------------
+// TypedColumnWriter
+
+template <typename Type>
+TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
+    std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
+    const WriterProperties* properties)
+    : ColumnWriter(metadata, std::move(pager), expected_rows,
+          (encoding == Encoding::PLAIN_DICTIONARY ||
+              encoding == Encoding::RLE_DICTIONARY),
+          encoding, properties) {
+  switch (encoding) {
+    case Encoding::PLAIN:
+      current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
+      break;
+    case Encoding::PLAIN_DICTIONARY:
+    case Encoding::RLE_DICTIONARY:
+      current_encoder_.reset(
+          new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
+      break;
+    default:
+      ParquetException::NYI("Selected encoding is not supported");
+  }
+
+  if (properties->statistics_enabled(descr_->path())) {
+    page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+    chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+  }
+}
+
+// Only one Dictionary Page is written.
+// Fallback to PLAIN if dictionary page limit is reached.
+template <typename Type>
+void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
+  auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+  if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
+    WriteDictionaryPage();
+    // Serialize the buffered Dictionary Indicies
+    FlushBufferedDataPages();
+    fallback_ = true;
+    // Only PLAIN encoding is supported for fallback in V1
+    current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool()));
+    encoding_ = Encoding::PLAIN;
+  }
+}
+
+template <typename Type>
+void TypedColumnWriter<Type>::WriteDictionaryPage() {
+  auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+  std::shared_ptr<PoolBuffer> buffer =
+      AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
+  dict_encoder->WriteDict(buffer->mutable_data());
+  // TODO Get rid of this deep call
+  dict_encoder->mem_pool()->FreeAll();
+
+  DictionaryPage page(
+      buffer, dict_encoder->num_entries(), properties_->dictionary_index_encoding());
+  total_bytes_written_ += pager_->WriteDictionaryPage(page);
+}
+
+template <typename Type>
+EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
+  EncodedStatistics result;
+  if (page_statistics_) result = page_statistics_->Encode();
+  return result;
+}
+
+template <typename Type>
+EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
+  EncodedStatistics result;
+  if (chunk_statistics_) result = chunk_statistics_->Encode();
+  return result;
+}
+
+template <typename Type>
+void TypedColumnWriter<Type>::ResetPageStatistics() {
+  if (chunk_statistics_ != nullptr) {
+    chunk_statistics_->Merge(*page_statistics_);
+    page_statistics_->Reset();
+  }
+}
+
+// ----------------------------------------------------------------------
+// Dynamic column writer constructor
+
+std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
+    std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+    const WriterProperties* properties) {
+  const ColumnDescriptor* descr = metadata->descr();
+  Encoding::type encoding = properties->encoding(descr->path());
+  if (properties->dictionary_enabled(descr->path()) &&
+      descr->physical_type() != Type::BOOLEAN) {
+    encoding = properties->dictionary_page_encoding();
+  }
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolWriter>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::INT32:
+      return std::make_shared<Int32Writer>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::INT64:
+      return std::make_shared<Int64Writer>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::INT96:
+      return std::make_shared<Int96Writer>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::FLOAT:
+      return std::make_shared<FloatWriter>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::DOUBLE:
+      return std::make_shared<DoubleWriter>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayWriter>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayWriter>(
+          metadata, std::move(pager), expected_rows, encoding, properties);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<ColumnWriter>(nullptr);
+}
+
+// ----------------------------------------------------------------------
+// Instantiate templated classes
+
+template <typename DType>
+inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
+    const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
+  int64_t values_to_write = 0;
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+    }
+
+    WriteDefinitionLevels(num_values, def_levels);
+  } else {
+    // Required field, write all values
+    values_to_write = num_values;
+  }
+
+  // Not present for non-repeated fields
+  if (descr_->max_repetition_level() > 0) {
+    // A row could include more than one value
+    // Count the occasions where we start a new row
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (rep_levels[i] == 0) { num_rows_++; }
+    }
+
+    WriteRepetitionLevels(num_values, rep_levels);
+  } else {
+    // Each value is exactly one row
+    num_rows_ += static_cast<int>(num_values);
+  }
+
+  if (num_rows_ > expected_rows_) {
+    throw ParquetException("More rows were written in the column chunk than expected");
+  }
+
+  // PARQUET-780
+  if (values_to_write > 0) { DCHECK(nullptr != values) << "Values ptr cannot be NULL"; }
+
+  WriteValues(values_to_write, values);
+
+  if (page_statistics_ != nullptr) {
+    page_statistics_->Update(values, values_to_write, num_values - values_to_write);
+  }
+
+  num_buffered_values_ += num_values;
+  num_buffered_encoded_values_ += values_to_write;
+
+  if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+    AddDataPage();
+  }
+  if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+
+  return values_to_write;
+}
+
+template <typename DType>
+inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) {
+  int64_t values_to_write = 0;
+  int64_t spaced_values_to_write = 0;
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0) {
+    // Minimal definition level for which spaced values are written
+    int16_t min_spaced_def_level = descr_->max_definition_level();
+    if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; }
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+      if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; }
+    }
+
+    WriteDefinitionLevels(num_values, def_levels);
+  } else {
+    // Required field, write all values
+    values_to_write = num_values;
+    spaced_values_to_write = num_values;
+  }
+
+  // Not present for non-repeated fields
+  if (descr_->max_repetition_level() > 0) {
+    // A row could include more than one value
+    // Count the occasions where we start a new row
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (rep_levels[i] == 0) { num_rows_++; }
+    }
+
+    WriteRepetitionLevels(num_values, rep_levels);
+  } else {
+    // Each value is exactly one row
+    num_rows_ += static_cast<int>(num_values);
+  }
+
+  if (num_rows_ > expected_rows_) {
+    throw ParquetException("More rows were written in the column chunk than expected");
+  }
+
+  if (descr_->schema_node()->is_optional()) {
+    WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values);
+  } else {
+    WriteValues(values_to_write, values);
+  }
+  *num_spaced_written = spaced_values_to_write;
+
+  if (page_statistics_ != nullptr) {
+    page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
+        num_values - values_to_write);
+  }
+
+  num_buffered_values_ += num_values;
+  num_buffered_encoded_values_ += values_to_write;
+
+  if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+    AddDataPage();
+  }
+  if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+
+  return values_to_write;
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
+    const int16_t* rep_levels, const T* values) {
+  // We check for DataPage limits only after we have inserted the values. If a user
+  // writes a large number of values, the DataPage size can be much above the limit.
+  // The purpose of this chunking is to bound this. Even if a user writes large number
+  // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+  // pagesize limit
+  int64_t write_batch_size = properties_->write_batch_size();
+  int num_batches = static_cast<int>(num_values / write_batch_size);
+  int64_t num_remaining = num_values % write_batch_size;
+  int64_t value_offset = 0;
+  for (int round = 0; round < num_batches; round++) {
+    int64_t offset = round * write_batch_size;
+    int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
+        &rep_levels[offset], &values[value_offset]);
+    value_offset += num_values;
+  }
+  // Write the remaining values
+  int64_t offset = num_batches * write_batch_size;
+  WriteMiniBatch(
+      num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const T* values) {
+  // We check for DataPage limits only after we have inserted the values. If a user
+  // writes a large number of values, the DataPage size can be much above the limit.
+  // The purpose of this chunking is to bound this. Even if a user writes large number
+  // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+  // pagesize limit
+  int64_t write_batch_size = properties_->write_batch_size();
+  int num_batches = static_cast<int>(num_values / write_batch_size);
+  int64_t num_remaining = num_values % write_batch_size;
+  int64_t num_spaced_written = 0;
+  int64_t values_offset = 0;
+  for (int round = 0; round < num_batches; round++) {
+    int64_t offset = round * write_batch_size;
+    WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
+        valid_bits, valid_bits_offset + values_offset, values + values_offset,
+        &num_spaced_written);
+    values_offset += num_spaced_written;
+  }
+  // Write the remaining values
+  int64_t offset = num_batches * write_batch_size;
+  WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
+      valid_bits, valid_bits_offset + values_offset, values + values_offset,
+      &num_spaced_written);
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
+  current_encoder_->Put(values, static_cast<int>(num_values));
+}
+
+template <typename DType>
+void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
+    const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
+  current_encoder_->PutSpaced(
+      values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+}
+
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
new file mode 100644
index 0000000..4e113de
--- /dev/null
+++ b/src/parquet/column_writer.h
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_WRITER_H
+#define PARQUET_COLUMN_WRITER_H
+
+#include <vector>
+
+#include "parquet/column_page.h"
+#include "parquet/encoding.h"
+#include "parquet/file/metadata.h"
+#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+class BitWriter;
+class RleEncoder;
+
+class PARQUET_EXPORT LevelEncoder {
+ public:
+  LevelEncoder();
+  ~LevelEncoder();
+
+  static int MaxBufferSize(
+      Encoding::type encoding, int16_t max_level, int num_buffered_values);
+
+  // Initialize the LevelEncoder.
+  void Init(Encoding::type encoding, int16_t max_level, int num_buffered_values,
+      uint8_t* data, int data_size);
+
+  // Encodes a batch of levels from an array and returns the number of levels encoded
+  int Encode(int batch_size, const int16_t* levels);
+
+  int32_t len() {
+    if (encoding_ != Encoding::RLE) {
+      throw ParquetException("Only implemented for RLE encoding");
+    }
+    return rle_length_;
+  }
+
+ private:
+  int bit_width_;
+  int rle_length_;
+  Encoding::type encoding_;
+  std::unique_ptr<RleEncoder> rle_encoder_;
+  std::unique_ptr<BitWriter> bit_packed_encoder_;
+};
+
+static constexpr int WRITE_BATCH_SIZE = 1000;
+class PARQUET_EXPORT ColumnWriter {
+ public:
+  ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
+      int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
+      const WriterProperties* properties);
+
+  static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
+      std::unique_ptr<PageWriter>, int64_t expected_rows,
+      const WriterProperties* properties);
+
+  Type::type type() const { return descr_->physical_type(); }
+
+  const ColumnDescriptor* descr() const { return descr_; }
+
+  /**
+   * Closes the ColumnWriter, commits any buffered values to pages.
+   *
+   * @return Total size of the column in bytes
+   */
+  int64_t Close();
+
+ protected:
+  virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+
+  // Serializes Dictionary Page if enabled
+  virtual void WriteDictionaryPage() = 0;
+
+  // Checks if the Dictionary Page size limit is reached
+  // If the limit is reached, the Dictionary and Data Pages are serialized
+  // The encoding is switched to PLAIN
+
+  virtual void CheckDictionarySizeLimit() = 0;
+
+  // Plain-encoded statistics of the current page
+  virtual EncodedStatistics GetPageStatistics() = 0;
+
+  // Plain-encoded statistics of the whole chunk
+  virtual EncodedStatistics GetChunkStatistics() = 0;
+
+  // Merges page statistics into chunk statistics, then resets the values
+  virtual void ResetPageStatistics() = 0;
+
+  // Adds Data Pages to an in memory buffer in dictionary encoding mode
+  // Serializes the Data Pages in other encoding modes
+  void AddDataPage();
+
+  // Serializes Data Pages
+  void WriteDataPage(const CompressedDataPage& page);
+
+  // Write multiple definition levels
+  void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
+
+  // Write multiple repetition levels
+  void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
+
+  // RLE encode the src_buffer into dest_buffer and return the encoded size
+  int64_t RleEncodeLevels(
+      const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level);
+
+  // Serialize the buffered Data Pages
+  void FlushBufferedDataPages();
+
+  ColumnChunkMetaDataBuilder* metadata_;
+  const ColumnDescriptor* descr_;
+
+  std::unique_ptr<PageWriter> pager_;
+
+  // The number of rows that should be written in this column chunk.
+  int64_t expected_rows_;
+  bool has_dictionary_;
+  Encoding::type encoding_;
+  const WriterProperties* properties_;
+
+  LevelEncoder level_encoder_;
+
+  ::arrow::MemoryPool* allocator_;
+  ChunkedAllocator pool_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int64_t num_buffered_values_;
+
+  // The total number of stored values. For repeated or optional values, this
+  // number may be lower than num_buffered_values_.
+  int64_t num_buffered_encoded_values_;
+
+  // Total number of rows written with this ColumnWriter
+  int num_rows_;
+
+  // Records the total number of bytes written by the serializer
+  int64_t total_bytes_written_;
+
+  // Flag to check if the Writer has been closed
+  bool closed_;
+
+  // Flag to infer if dictionary encoding has fallen back to PLAIN
+  bool fallback_;
+
+  std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
+  std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
+
+  std::shared_ptr<ResizableBuffer> definition_levels_rle_;
+  std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
+
+  std::shared_ptr<ResizableBuffer> uncompressed_data_;
+  std::shared_ptr<ResizableBuffer> compressed_data_;
+
+  std::vector<CompressedDataPage> data_pages_;
+
+ private:
+  void InitSinks();
+};
+
+// API to write values to a single column. This is the main client facing API.
+template <typename DType>
+class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
+ public:
+  typedef typename DType::c_type T;
+
+  TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
+      std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
+      const WriterProperties* properties);
+
+  // Write a batch of repetition levels, definition levels, and values to the
+  // column.
+  void WriteBatch(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const T* values);
+
+  /// Write a batch of repetition levels, definition levels, and values to the
+  /// column.
+  ///
+  /// In comparision to WriteBatch the length of repetition and definition levels
+  /// is the same as of the number of values read for max_definition_level == 1.
+  /// In the case of max_definition_level > 1, the repetition and definition
+  /// levels are larger than the values but the values include the null entries
+  /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
+  /// in the parameters of this function if the input has the length of num_values or the
+  /// _number of rows in the lowest nesting level_.
+  ///
+  /// In the case that the most inner node in the Parquet is required, the _number of rows
+  /// in the lowest nesting level_ is equal to the number of non-null values. If the
+  /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
+  /// also includes all values with definition_level == (max_definition_level - 1).
+  ///
+  /// @param num_values number of levels to write.
+  /// @param def_levels The Parquet definiton levels, length is num_values
+  /// @param rep_levels The Parquet repetition levels, length is num_values
+  /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
+  ///   level. The length is number of rows in the lowest nesting level.
+  /// @param valid_bits_offset The offset in bits of the valid_bits where the
+  ///   first relevant bit resides.
+  /// @param values The values in the lowest nested level including
+  ///   spacing for nulls on the lowest levels; input has the length
+  ///   of the number of rows on the lowest nesting level.
+  void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+      const T* values);
+
+ protected:
+  std::shared_ptr<Buffer> GetValuesBuffer() override {
+    return current_encoder_->FlushValues();
+  }
+  void WriteDictionaryPage() override;
+  void CheckDictionarySizeLimit() override;
+  EncodedStatistics GetPageStatistics() override;
+  EncodedStatistics GetChunkStatistics() override;
+  void ResetPageStatistics() override;
+
+ private:
+  int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const T* values);
+
+  int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+      const T* values, int64_t* num_spaced_written);
+
+  typedef Encoder<DType> EncoderType;
+
+  // Write values to a temporary buffer before they are encoded into pages
+  void WriteValues(int64_t num_values, const T* values);
+  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+      int64_t valid_bits_offset, const T* values);
+  std::unique_ptr<EncoderType> current_encoder_;
+
+  typedef TypedRowGroupStatistics<DType> TypedStats;
+  std::unique_ptr<TypedStats> page_statistics_;
+  std::unique_ptr<TypedStats> chunk_statistics_;
+};
+
+typedef TypedColumnWriter<BooleanType> BoolWriter;
+typedef TypedColumnWriter<Int32Type> Int32Writer;
+typedef TypedColumnWriter<Int64Type> Int64Writer;
+typedef TypedColumnWriter<Int96Type> Int96Writer;
+typedef TypedColumnWriter<FloatType> FloatWriter;
+typedef TypedColumnWriter<DoubleType> DoubleWriter;
+typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
+typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
+
+extern template class PARQUET_EXPORT TypedColumnWriter<BooleanType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<Int32Type>;
+extern template class PARQUET_EXPORT TypedColumnWriter<Int64Type>;
+extern template class PARQUET_EXPORT TypedColumnWriter<Int96Type>;
+extern template class PARQUET_EXPORT TypedColumnWriter<FloatType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<DoubleType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<ByteArrayType>;
+extern template class PARQUET_EXPORT TypedColumnWriter<FLBAType>;
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/encoding-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index 7e78123..61b8e24 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -464,9 +464,8 @@ class DictEncoder : public Encoder<DType> {
     // reserve
     // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
     // but not reserving them would cause the encoder to fail.
-    return 1 +
-           RleEncoder::MaxBufferSize(
-               bit_width(), static_cast<int>(buffered_indices_.size())) +
+    return 1 + RleEncoder::MaxBufferSize(
+                   bit_width(), static_cast<int>(buffered_indices_.size())) +
            RleEncoder::MinBufferSize(bit_width());
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 7c4690e..59d2051 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -26,7 +26,7 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column/page.h"
+#include "parquet/column_page.h"
 #include "parquet/exception.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/parquet_types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index a4a2016..10ce40c 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -15,9 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/column/statistics.h"
 #include "parquet/file/metadata.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"
 #include "parquet/types.h"
 #include <gtest/gtest.h>
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 7a90eeb..5736fa1 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -17,12 +17,12 @@
 
 #include <gtest/gtest.h>
 
-#include "parquet/column/reader.h"
-#include "parquet/column/test-specialization.h"
-#include "parquet/column/test-util.h"
-#include "parquet/column/writer.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
 #include "parquet/file/reader.h"
 #include "parquet/file/writer.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 50d2114..2dc50d1 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -24,9 +24,9 @@
 
 #include "arrow/util/key_value_metadata.h"
 
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
+#include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/printer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/printer.cc b/src/parquet/file/printer.cc
index e398c3a..52b2598 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/file/printer.cc
@@ -20,7 +20,7 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column/scanner.h"
+#include "parquet/column_scanner.h"
 
 using std::string;
 using std::vector;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 1d9ab47..c39d3eb 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -26,7 +26,7 @@
 
 #include "arrow/util/compression.h"
 
-#include "parquet/column/page.h"
+#include "parquet/column_page.h"
 #include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/thrift.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 1ac2384..2667fa8 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -22,11 +22,11 @@
 #include <memory>
 #include <vector>
 
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
+#include "parquet/column_page.h"
 #include "parquet/file/metadata.h"
 #include "parquet/file/reader.h"
 #include "parquet/parquet_types.h"
+#include "parquet/properties.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 7bf2c76..d3247cb 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -25,9 +25,9 @@
 
 #include "arrow/io/file.h"
 
-#include "parquet/column/page.h"
-#include "parquet/column/reader.h"
-#include "parquet/column/scanner.h"
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
 #include "parquet/exception.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 7d3c3f9..1cd287c 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -25,11 +25,11 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
+#include "parquet/column_page.h"
 #include "parquet/file/metadata.h"
+#include "parquet/properties.h"
 #include "parquet/schema.h"
+#include "parquet/statistics.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index bb24737..1cceb95 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -22,7 +22,7 @@
 
 #include "arrow/util/compression.h"
 
-#include "parquet/column/writer.h"
+#include "parquet/column_writer.h"
 #include "parquet/schema-internal.h"
 #include "parquet/schema.h"
 #include "parquet/thrift.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 6ac7927..447579a 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -21,7 +21,7 @@
 #include <memory>
 #include <vector>
 
-#include "parquet/column/page.h"
+#include "parquet/column_page.h"
 #include "parquet/file/metadata.h"
 #include "parquet/file/writer.h"
 #include "parquet/parquet_types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 7d48720..b22281a 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -21,8 +21,8 @@
 #include <cstdint>
 #include <memory>
 
-#include "parquet/column/properties.h"
 #include "parquet/file/metadata.h"
+#include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
new file mode 100644
index 0000000..0e6d725
--- /dev/null
+++ b/src/parquet/properties-test.cc
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "parquet/file/reader.h"
+#include "parquet/properties.h"
+
+namespace parquet {
+
+using schema::ColumnPath;
+
+namespace test {
+
+TEST(TestReaderProperties, Basics) {
+  ReaderProperties props;
+
+  ASSERT_EQ(DEFAULT_BUFFER_SIZE, props.buffer_size());
+  ASSERT_EQ(DEFAULT_USE_BUFFERED_STREAM, props.is_buffered_stream_enabled());
+}
+
+TEST(TestWriterProperties, Basics) {
+  std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+  ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
+  ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, props->dictionary_pagesize_limit());
+  ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
+}
+
+TEST(TestWriterProperties, AdvancedHandling) {
+  WriterProperties::Builder builder;
+  builder.compression("gzip", Compression::GZIP);
+  builder.compression(Compression::SNAPPY);
+  builder.encoding(Encoding::DELTA_BINARY_PACKED);
+  builder.encoding("delta-length", Encoding::DELTA_LENGTH_BYTE_ARRAY);
+  std::shared_ptr<WriterProperties> props = builder.build();
+
+  ASSERT_EQ(Compression::GZIP, props->compression(ColumnPath::FromDotString("gzip")));
+  ASSERT_EQ(
+      Compression::SNAPPY, props->compression(ColumnPath::FromDotString("delta-length")));
+  ASSERT_EQ(
+      Encoding::DELTA_BINARY_PACKED, props->encoding(ColumnPath::FromDotString("gzip")));
+  ASSERT_EQ(Encoding::DELTA_LENGTH_BYTE_ARRAY,
+      props->encoding(ColumnPath::FromDotString("delta-length")));
+}
+
+}  // namespace test
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/properties.h b/src/parquet/properties.h
new file mode 100644
index 0000000..3ebc3b7
--- /dev/null
+++ b/src/parquet/properties.h
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_PROPERTIES_H
+#define PARQUET_COLUMN_PROPERTIES_H
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+
+#include "parquet/exception.h"
+#include "parquet/parquet_version.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+struct ParquetVersion {
+  enum type { PARQUET_1_0, PARQUET_2_0 };
+};
+
+static int64_t DEFAULT_BUFFER_SIZE = 0;
+static bool DEFAULT_USE_BUFFERED_STREAM = false;
+
+class PARQUET_EXPORT ReaderProperties {
+ public:
+  explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : pool_(pool) {
+    buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
+    buffer_size_ = DEFAULT_BUFFER_SIZE;
+  }
+
+  ::arrow::MemoryPool* memory_pool() const { return pool_; }
+
+  std::unique_ptr<InputStream> GetStream(
+      RandomAccessSource* source, int64_t start, int64_t num_bytes) {
+    std::unique_ptr<InputStream> stream;
+    if (buffered_stream_enabled_) {
+      stream.reset(
+          new BufferedInputStream(pool_, buffer_size_, source, start, num_bytes));
+    } else {
+      stream.reset(new InMemoryInputStream(source, start, num_bytes));
+    }
+    return stream;
+  }
+
+  bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
+
+  void enable_buffered_stream() { buffered_stream_enabled_ = true; }
+
+  void disable_buffered_stream() { buffered_stream_enabled_ = false; }
+
+  void set_buffer_size(int64_t buf_size) { buffer_size_ = buf_size; }
+
+  int64_t buffer_size() const { return buffer_size_; }
+
+ private:
+  ::arrow::MemoryPool* pool_;
+  int64_t buffer_size_;
+  bool buffered_stream_enabled_;
+};
+
+ReaderProperties PARQUET_EXPORT default_reader_properties();
+
+static constexpr int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
+static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
+static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = DEFAULT_PAGE_SIZE;
+static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
+static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
+static constexpr Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
+static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
+    ParquetVersion::PARQUET_1_0;
+static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
+static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
+
+class PARQUET_EXPORT ColumnProperties {
+ public:
+  ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
+      Compression::type codec = DEFAULT_COMPRESSION_TYPE,
+      bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
+      bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED)
+      : encoding(encoding),
+        codec(codec),
+        dictionary_enabled(dictionary_enabled),
+        statistics_enabled(statistics_enabled) {}
+
+  Encoding::type encoding;
+  Compression::type codec;
+  bool dictionary_enabled;
+  bool statistics_enabled;
+};
+
+class PARQUET_EXPORT WriterProperties {
+ public:
+  class Builder {
+   public:
+    Builder()
+        : pool_(::arrow::default_memory_pool()),
+          dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
+          write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
+          pagesize_(DEFAULT_PAGE_SIZE),
+          version_(DEFAULT_WRITER_VERSION),
+          created_by_(DEFAULT_CREATED_BY) {}
+    virtual ~Builder() {}
+
+    Builder* memory_pool(::arrow::MemoryPool* pool) {
+      pool_ = pool;
+      return this;
+    }
+
+    Builder* enable_dictionary() {
+      default_column_properties_.dictionary_enabled = true;
+      return this;
+    }
+
+    Builder* disable_dictionary() {
+      default_column_properties_.dictionary_enabled = false;
+      return this;
+    }
+
+    Builder* enable_dictionary(const std::string& path) {
+      dictionary_enabled_[path] = true;
+      return this;
+    }
+
+    Builder* enable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
+      return this->enable_dictionary(path->ToDotString());
+    }
+
+    Builder* disable_dictionary(const std::string& path) {
+      dictionary_enabled_[path] = false;
+      return this;
+    }
+
+    Builder* disable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
+      return this->disable_dictionary(path->ToDotString());
+    }
+
+    Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
+      dictionary_pagesize_limit_ = dictionary_psize_limit;
+      return this;
+    }
+
+    Builder* write_batch_size(int64_t write_batch_size) {
+      write_batch_size_ = write_batch_size;
+      return this;
+    }
+
+    Builder* data_pagesize(int64_t pg_size) {
+      pagesize_ = pg_size;
+      return this;
+    }
+
+    Builder* version(ParquetVersion::type version) {
+      version_ = version;
+      return this;
+    }
+
+    Builder* created_by(const std::string& created_by) {
+      created_by_ = created_by;
+      return this;
+    }
+
+    /**
+     * Define the encoding that is used when we don't utilise dictionary encoding.
+     *
+     * This either apply if dictionary encoding is disabled or if we fallback
+     * as the dictionary grew too large.
+     */
+    Builder* encoding(Encoding::type encoding_type) {
+      if (encoding_type == Encoding::PLAIN_DICTIONARY ||
+          encoding_type == Encoding::RLE_DICTIONARY) {
+        throw ParquetException("Can't use dictionary encoding as fallback encoding");
+      }
+
+      default_column_properties_.encoding = encoding_type;
+      return this;
+    }
+
+    /**
+     * Define the encoding that is used when we don't utilise dictionary encoding.
+     *
+     * This either apply if dictionary encoding is disabled or if we fallback
+     * as the dictionary grew too large.
+     */
+    Builder* encoding(const std::string& path, Encoding::type encoding_type) {
+      if (encoding_type == Encoding::PLAIN_DICTIONARY ||
+          encoding_type == Encoding::RLE_DICTIONARY) {
+        throw ParquetException("Can't use dictionary encoding as fallback encoding");
+      }
+
+      encodings_[path] = encoding_type;
+      return this;
+    }
+
+    /**
+     * Define the encoding that is used when we don't utilise dictionary encoding.
+     *
+     * This either apply if dictionary encoding is disabled or if we fallback
+     * as the dictionary grew too large.
+     */
+    Builder* encoding(
+        const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
+      return this->encoding(path->ToDotString(), encoding_type);
+    }
+
+    Builder* compression(Compression::type codec) {
+      default_column_properties_.codec = codec;
+      return this;
+    }
+
+    Builder* compression(const std::string& path, Compression::type codec) {
+      codecs_[path] = codec;
+      return this;
+    }
+
+    Builder* compression(
+        const std::shared_ptr<schema::ColumnPath>& path, Compression::type codec) {
+      return this->compression(path->ToDotString(), codec);
+    }
+
+    Builder* enable_statistics() {
+      default_column_properties_.statistics_enabled = true;
+      return this;
+    }
+
+    Builder* disable_statistics() {
+      default_column_properties_.statistics_enabled = false;
+      return this;
+    }
+
+    Builder* enable_statistics(const std::string& path) {
+      statistics_enabled_[path] = true;
+      return this;
+    }
+
+    Builder* enable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
+      return this->enable_statistics(path->ToDotString());
+    }
+
+    Builder* disable_statistics(const std::string& path) {
+      statistics_enabled_[path] = false;
+      return this;
+    }
+
+    Builder* disable_statistics(const std::shared_ptr<schema::ColumnPath>& path) {
+      return this->disable_statistics(path->ToDotString());
+    }
+
+    std::shared_ptr<WriterProperties> build() {
+      std::unordered_map<std::string, ColumnProperties> column_properties;
+      auto get = [&](const std::string& key) -> ColumnProperties& {
+        auto it = column_properties.find(key);
+        if (it == column_properties.end())
+          return column_properties[key] = default_column_properties_;
+        else
+          return it->second;
+      };
+
+      for (const auto& item : encodings_)
+        get(item.first).encoding = item.second;
+      for (const auto& item : codecs_)
+        get(item.first).codec = item.second;
+      for (const auto& item : dictionary_enabled_)
+        get(item.first).dictionary_enabled = item.second;
+      for (const auto& item : statistics_enabled_)
+        get(item.first).statistics_enabled = item.second;
+
+      return std::shared_ptr<WriterProperties>(new WriterProperties(pool_,
+          dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_,
+          default_column_properties_, column_properties));
+    }
+
+   private:
+    ::arrow::MemoryPool* pool_;
+    int64_t dictionary_pagesize_limit_;
+    int64_t write_batch_size_;
+    int64_t pagesize_;
+    ParquetVersion::type version_;
+    std::string created_by_;
+
+    // Settings used for each column unless overridden in any of the maps below
+    ColumnProperties default_column_properties_;
+    std::unordered_map<std::string, Encoding::type> encodings_;
+    std::unordered_map<std::string, Compression::type> codecs_;
+    std::unordered_map<std::string, bool> dictionary_enabled_;
+    std::unordered_map<std::string, bool> statistics_enabled_;
+  };
+
+  inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
+
+  inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
+
+  inline int64_t write_batch_size() const { return write_batch_size_; }
+
+  inline int64_t data_pagesize() const { return pagesize_; }
+
+  inline ParquetVersion::type version() const { return parquet_version_; }
+
+  inline std::string created_by() const { return parquet_created_by_; }
+
+  inline Encoding::type dictionary_index_encoding() const {
+    if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
+      return Encoding::PLAIN_DICTIONARY;
+    } else {
+      return Encoding::RLE_DICTIONARY;
+    }
+  }
+
+  inline Encoding::type dictionary_page_encoding() const {
+    if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
+      return Encoding::PLAIN_DICTIONARY;
+    } else {
+      return Encoding::PLAIN;
+    }
+  }
+
+  const ColumnProperties& column_properties(
+      const std::shared_ptr<schema::ColumnPath>& path) const {
+    auto it = column_properties_.find(path->ToDotString());
+    if (it != column_properties_.end()) return it->second;
+    return default_column_properties_;
+  }
+
+  Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
+    return column_properties(path).encoding;
+  }
+
+  Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
+    return column_properties(path).codec;
+  }
+
+  bool dictionary_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
+    return column_properties(path).dictionary_enabled;
+  }
+
+  bool statistics_enabled(const std::shared_ptr<schema::ColumnPath>& path) const {
+    return column_properties(path).statistics_enabled;
+  }
+
+ private:
+  explicit WriterProperties(::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
+      int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version,
+      const std::string& created_by, const ColumnProperties& default_column_properties,
+      const std::unordered_map<std::string, ColumnProperties>& column_properties)
+      : pool_(pool),
+        dictionary_pagesize_limit_(dictionary_pagesize_limit),
+        write_batch_size_(write_batch_size),
+        pagesize_(pagesize),
+        parquet_version_(version),
+        parquet_created_by_(created_by),
+        default_column_properties_(default_column_properties),
+        column_properties_(column_properties) {}
+
+  ::arrow::MemoryPool* pool_;
+  int64_t dictionary_pagesize_limit_;
+  int64_t write_batch_size_;
+  int64_t pagesize_;
+  ParquetVersion::type parquet_version_;
+  std::string parquet_created_by_;
+  ColumnProperties default_column_properties_;
+  std::unordered_map<std::string, ColumnProperties> column_properties_;
+};
+
+std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_PROPERTIES_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index fcce38b..cb40abb 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -25,8 +25,8 @@
 
 #include "arrow/io/file.h"
 
-#include "parquet/column/reader.h"
-#include "parquet/column/scanner.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
 #include "parquet/file/printer.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/file/reader.h"