You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2019/02/05 14:16:52 UTC

[arrow] branch master updated: PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class'

This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ce2655  PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class'
7ce2655 is described below

commit 7ce26553b8ce78085751e4a4ae603d4043abf337
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Feb 5 15:15:43 2019 +0100

    PARQUET-1521: [C++] Use pure virtual interfaces for parquet::TypedColumnWriter, remove use of 'extern template class'
    
    This follows corresponding work in TypedColumnReader. The public API is unchanged as can be verified by lack of changes to the unit tests
    
    Author: Wes McKinney <we...@apache.org>
    
    Closes #3551 from wesm/PARQUET-1521 and squashes the following commits:
    
    aa6687a9 <Wes McKinney> Fix clang warnings
    33555044 <Wes McKinney> Print build warning level
    b657ac93 <Wes McKinney> Fix parquet-column-io-benchmark
    61204dec <Wes McKinney> Refactor TypedColumnWriter implementation to be based on pure virtual interface, remove use of extern template class
---
 cpp/cmake_modules/SetupCxxFlags.cmake  |   2 +
 cpp/src/parquet/column-io-benchmark.cc |  26 +-
 cpp/src/parquet/column_writer.cc       | 450 ++++++++++++++++++++++-----------
 cpp/src/parquet/column_writer.h        | 208 +++------------
 4 files changed, 350 insertions(+), 336 deletions(-)

diff --git a/cpp/cmake_modules/SetupCxxFlags.cmake b/cpp/cmake_modules/SetupCxxFlags.cmake
index 44ca22f..43dab02 100644
--- a/cpp/cmake_modules/SetupCxxFlags.cmake
+++ b/cpp/cmake_modules/SetupCxxFlags.cmake
@@ -111,6 +111,8 @@ if (NOT BUILD_WARNING_LEVEL)
 endif(NOT BUILD_WARNING_LEVEL)
 string(TOUPPER ${BUILD_WARNING_LEVEL} BUILD_WARNING_LEVEL)
 
+message(STATUS "Arrow build warning level: ${BUILD_WARNING_LEVEL}")
+
 if ("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
   # Pre-checkin builds
   if ("${COMPILER_FAMILY}" STREQUAL "msvc")
diff --git a/cpp/src/parquet/column-io-benchmark.cc b/cpp/src/parquet/column-io-benchmark.cc
index c648d56..762bcb7 100644
--- a/cpp/src/parquet/column-io-benchmark.cc
+++ b/cpp/src/parquet/column-io-benchmark.cc
@@ -30,14 +30,15 @@ using schema::PrimitiveNode;
 
 namespace benchmark {
 
-std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
+std::shared_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
                                          ColumnChunkMetaDataBuilder* metadata,
                                          ColumnDescriptor* schema,
                                          const WriterProperties* properties) {
   std::unique_ptr<PageWriter> pager =
       PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
-  return std::unique_ptr<Int64Writer>(new Int64Writer(
-      metadata, std::move(pager), false /*use_dictionary*/, Encoding::PLAIN, properties));
+  std::shared_ptr<ColumnWriter> writer =
+      ColumnWriter::Make(metadata, std::move(pager), properties);
+  return std::static_pointer_cast<Int64Writer>(writer);
 }
 
 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
@@ -65,14 +66,17 @@ static void BM_WriteInt64Column(::benchmark::State& state) {
   std::vector<int16_t> definition_levels(state.range(0), 1);
   std::vector<int16_t> repetition_levels(state.range(0), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
-  WriterProperties::Builder builder;
-  std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
+  std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
+                                                     .compression(codec)
+                                                     ->encoding(Encoding::PLAIN)
+                                                     ->disable_dictionary()
+                                                     ->build();
   auto metadata = ColumnChunkMetaDataBuilder::Make(
       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
   while (state.KeepRunning()) {
     InMemoryOutputStream stream;
-    std::unique_ptr<Int64Writer> writer = BuildWriter(
+    std::shared_ptr<Int64Writer> writer = BuildWriter(
         state.range(0), &stream, metadata.get(), schema.get(), properties.get());
     writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
                        values.data());
@@ -125,13 +129,17 @@ static void BM_ReadInt64Column(::benchmark::State& state) {
   std::vector<int16_t> definition_levels(state.range(0), 1);
   std::vector<int16_t> repetition_levels(state.range(0), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
-  WriterProperties::Builder builder;
-  std::shared_ptr<WriterProperties> properties = builder.compression(codec)->build();
+  std::shared_ptr<WriterProperties> properties = WriterProperties::Builder()
+                                                     .compression(codec)
+                                                     ->encoding(Encoding::PLAIN)
+                                                     ->disable_dictionary()
+                                                     ->build();
+
   auto metadata = ColumnChunkMetaDataBuilder::Make(
       properties, schema.get(), reinterpret_cast<uint8_t*>(&thrift_metadata));
 
   InMemoryOutputStream stream;
-  std::unique_ptr<Int64Writer> writer = BuildWriter(
+  std::shared_ptr<Int64Writer> writer = BuildWriter(
       state.range(0), &stream, metadata.get(), schema.get(), properties.get());
   writer->WriteBatch(values.size(), definition_levels.data(), repetition_levels.data(),
                      values.data());
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 0919a3f..47a1256 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -353,57 +353,148 @@ std::shared_ptr<WriterProperties> default_writer_properties() {
   return default_writer_properties;
 }
 
-ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-                           std::unique_ptr<PageWriter> pager, bool has_dictionary,
-                           Encoding::type encoding, const WriterProperties* properties)
-    : metadata_(metadata),
-      descr_(metadata->descr()),
-      pager_(std::move(pager)),
-      has_dictionary_(has_dictionary),
-      encoding_(encoding),
-      properties_(properties),
-      allocator_(properties->memory_pool()),
-      num_buffered_values_(0),
-      num_buffered_encoded_values_(0),
-      rows_written_(0),
-      total_bytes_written_(0),
-      total_compressed_bytes_(0),
-      closed_(false),
-      fallback_(false) {
-  definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
-  repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
-  definition_levels_rle_ =
-      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  repetition_levels_rle_ =
-      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  uncompressed_data_ =
-      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  if (pager_->has_compressor()) {
-    compressed_data_ =
+class ColumnWriterImpl {
+ public:
+  ColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
+                   std::unique_ptr<PageWriter> pager, const bool use_dictionary,
+                   Encoding::type encoding, const WriterProperties* properties)
+      : metadata_(metadata),
+        descr_(metadata->descr()),
+        pager_(std::move(pager)),
+        has_dictionary_(use_dictionary),
+        encoding_(encoding),
+        properties_(properties),
+        allocator_(properties->memory_pool()),
+        num_buffered_values_(0),
+        num_buffered_encoded_values_(0),
+        rows_written_(0),
+        total_bytes_written_(0),
+        total_compressed_bytes_(0),
+        closed_(false),
+        fallback_(false) {
+    definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+    repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+    definition_levels_rle_ =
+        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+    repetition_levels_rle_ =
         std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+    uncompressed_data_ =
+        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+    if (pager_->has_compressor()) {
+      compressed_data_ =
+          std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+    }
   }
-}
 
-void ColumnWriter::InitSinks() {
-  definition_levels_sink_->Clear();
-  repetition_levels_sink_->Clear();
-}
+  virtual ~ColumnWriterImpl() = default;
 
-void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
-  DCHECK(!closed_);
-  definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
-                                 sizeof(int16_t) * num_levels);
-}
+  int64_t Close();
 
-void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
-  DCHECK(!closed_);
-  repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
-                                 sizeof(int16_t) * num_levels);
-}
+ protected:
+  virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+
+  // Serializes Dictionary Page if enabled
+  virtual void WriteDictionaryPage() = 0;
+
+  // Plain-encoded statistics of the current page
+  virtual EncodedStatistics GetPageStatistics() = 0;
+
+  // Plain-encoded statistics of the whole chunk
+  virtual EncodedStatistics GetChunkStatistics() = 0;
+
+  // Merges page statistics into chunk statistics, then resets the values
+  virtual void ResetPageStatistics() = 0;
+
+  // Adds Data Pages to an in memory buffer in dictionary encoding mode
+  // Serializes the Data Pages in other encoding modes
+  void AddDataPage();
+
+  // Serializes Data Pages
+  void WriteDataPage(const CompressedDataPage& page);
+
+  // Write multiple definition levels
+  void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
+    DCHECK(!closed_);
+    definition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
+                                   sizeof(int16_t) * num_levels);
+  }
+
+  // Write multiple repetition levels
+  void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
+    DCHECK(!closed_);
+    repetition_levels_sink_->Write(reinterpret_cast<const uint8_t*>(levels),
+                                   sizeof(int16_t) * num_levels);
+  }
+
+  // RLE encode the src_buffer into dest_buffer and return the encoded size
+  int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer,
+                          int16_t max_level);
+
+  // Serialize the buffered Data Pages
+  void FlushBufferedDataPages();
+
+  ColumnChunkMetaDataBuilder* metadata_;
+  const ColumnDescriptor* descr_;
+
+  std::unique_ptr<PageWriter> pager_;
+
+  bool has_dictionary_;
+  Encoding::type encoding_;
+  const WriterProperties* properties_;
+
+  LevelEncoder level_encoder_;
+
+  ::arrow::MemoryPool* allocator_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int64_t num_buffered_values_;
+
+  // The total number of stored values. For repeated or optional values, this
+  // number may be lower than num_buffered_values_.
+  int64_t num_buffered_encoded_values_;
+
+  // Total number of rows written with this ColumnWriter
+  int rows_written_;
+
+  // Records the total number of bytes written by the serializer
+  int64_t total_bytes_written_;
+
+  // Records the current number of compressed bytes in a column
+  int64_t total_compressed_bytes_;
+
+  // Flag to check if the Writer has been closed
+  bool closed_;
+
+  // Flag to infer if dictionary encoding has fallen back to PLAIN
+  bool fallback_;
+
+  std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
+  std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
+
+  std::shared_ptr<ResizableBuffer> definition_levels_rle_;
+  std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
+
+  std::shared_ptr<ResizableBuffer> uncompressed_data_;
+  std::shared_ptr<ResizableBuffer> compressed_data_;
+
+  std::vector<CompressedDataPage> data_pages_;
+
+ private:
+  void InitSinks() {
+    definition_levels_sink_->Clear();
+    repetition_levels_sink_->Clear();
+  }
+};
 
 // return the size of the encoded buffer
-int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer,
-                                      ResizableBuffer* dest_buffer, int16_t max_level) {
+int64_t ColumnWriterImpl::RleEncodeLevels(const Buffer& src_buffer,
+                                          ResizableBuffer* dest_buffer,
+                                          int16_t max_level) {
   // TODO: This only works with due to some RLE specifics
   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
                                                  static_cast<int>(num_buffered_values_)) +
@@ -425,7 +516,7 @@ int64_t ColumnWriter::RleEncodeLevels(const Buffer& src_buffer,
   return encoded_size;
 }
 
-void ColumnWriter::AddDataPage() {
+void ColumnWriterImpl::AddDataPage() {
   int64_t definition_levels_rle_size = 0;
   int64_t repetition_levels_rle_size = 0;
 
@@ -493,11 +584,11 @@ void ColumnWriter::AddDataPage() {
   num_buffered_encoded_values_ = 0;
 }
 
-void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
+void ColumnWriterImpl::WriteDataPage(const CompressedDataPage& page) {
   total_bytes_written_ += pager_->WriteDataPage(page);
 }
 
-int64_t ColumnWriter::Close() {
+int64_t ColumnWriterImpl::Close() {
   if (!closed_) {
     closed_ = true;
     if (has_dictionary_ && !fallback_) {
@@ -525,7 +616,7 @@ int64_t ColumnWriter::Close() {
   return total_bytes_written_;
 }
 
-void ColumnWriter::FlushBufferedDataPages() {
+void ColumnWriterImpl::FlushBufferedDataPages() {
   // Write all outstanding data to a new page
   if (num_buffered_values_ > 0) {
     AddDataPage();
@@ -540,47 +631,123 @@ void ColumnWriter::FlushBufferedDataPages() {
 // ----------------------------------------------------------------------
 // TypedColumnWriter
 
-template <typename Type>
-TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-                                           std::unique_ptr<PageWriter> pager,
-                                           const bool use_dictionary,
-                                           Encoding::type encoding,
-                                           const WriterProperties* properties)
-    : ColumnWriter(metadata, std::move(pager), use_dictionary, encoding, properties) {
-  current_encoder_ = MakeEncoder(Type::type_num, encoding, use_dictionary, descr_,
-                                 properties->memory_pool());
-
-  if (properties->statistics_enabled(descr_->path()) &&
-      (SortOrder::UNKNOWN != descr_->sort_order())) {
-    page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
-    chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+template <typename DType>
+class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<DType> {
+ public:
+  using T = typename DType::c_type;
+
+  TypedColumnWriterImpl(ColumnChunkMetaDataBuilder* metadata,
+                        std::unique_ptr<PageWriter> pager, const bool use_dictionary,
+                        Encoding::type encoding, const WriterProperties* properties)
+      : ColumnWriterImpl(metadata, std::move(pager), use_dictionary, encoding,
+                         properties) {
+    current_encoder_ = MakeEncoder(DType::type_num, encoding, use_dictionary, descr_,
+                                   properties->memory_pool());
+
+    if (properties->statistics_enabled(descr_->path()) &&
+        (SortOrder::UNKNOWN != descr_->sort_order())) {
+      page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+      chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
+    }
+  }
+
+  int64_t Close() override { return ColumnWriterImpl::Close(); }
+
+  void WriteBatch(int64_t num_values, const int16_t* def_levels,
+                  const int16_t* rep_levels, const T* values) override;
+
+  void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+                        const int16_t* rep_levels, const uint8_t* valid_bits,
+                        int64_t valid_bits_offset, const T* values) override;
+
+  int64_t EstimatedBufferedValueBytes() const override {
+    return current_encoder_->EstimatedDataEncodedSize();
   }
-}
+
+ protected:
+  std::shared_ptr<Buffer> GetValuesBuffer() override {
+    return current_encoder_->FlushValues();
+  }
+  void WriteDictionaryPage() override;
+
+  // Checks if the Dictionary Page size limit is reached
+  // If the limit is reached, the Dictionary and Data Pages are serialized
+  // The encoding is switched to PLAIN
+  void CheckDictionarySizeLimit();
+
+  EncodedStatistics GetPageStatistics() override {
+    EncodedStatistics result;
+    if (page_statistics_) result = page_statistics_->Encode();
+    return result;
+  }
+
+  EncodedStatistics GetChunkStatistics() override {
+    EncodedStatistics result;
+    if (chunk_statistics_) result = chunk_statistics_->Encode();
+    return result;
+  }
+
+  void ResetPageStatistics() override;
+
+  Type::type type() const override { return descr_->physical_type(); }
+
+  const ColumnDescriptor* descr() const override { return descr_; }
+
+  int64_t rows_written() const override { return rows_written_; }
+
+  int64_t total_compressed_bytes() const override { return total_compressed_bytes_; }
+
+  int64_t total_bytes_written() const override { return total_bytes_written_; }
+
+  const WriterProperties* properties() override { return properties_; }
+
+ private:
+  inline int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
+                                const int16_t* rep_levels, const T* values);
+
+  inline int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
+                                      const int16_t* rep_levels,
+                                      const uint8_t* valid_bits,
+                                      int64_t valid_bits_offset, const T* values,
+                                      int64_t* num_spaced_written);
+
+  // Write values to a temporary buffer before they are encoded into pages
+  void WriteValues(int64_t num_values, const T* values);
+  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+                         int64_t valid_bits_offset, const T* values);
+
+  using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
+  std::unique_ptr<Encoder> current_encoder_;
+
+  typedef TypedRowGroupStatistics<DType> TypedStats;
+  std::unique_ptr<TypedStats> page_statistics_;
+  std::unique_ptr<TypedStats> chunk_statistics_;
+};
 
 // Only one Dictionary Page is written.
 // Fallback to PLAIN if dictionary page limit is reached.
-template <typename Type>
-void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
+template <typename DType>
+void TypedColumnWriterImpl<DType>::CheckDictionarySizeLimit() {
   // We have to dynamic cast here because TypedEncoder<Type> as some compilers
   // don't want to cast through virtual inheritance
-  auto dict_encoder = dynamic_cast<DictEncoder<Type>*>(current_encoder_.get());
+  auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
   if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
     WriteDictionaryPage();
     // Serialize the buffered Dictionary Indicies
     FlushBufferedDataPages();
     fallback_ = true;
     // Only PLAIN encoding is supported for fallback in V1
-    current_encoder_ = MakeEncoder(Type::type_num, Encoding::PLAIN, false, descr_,
+    current_encoder_ = MakeEncoder(DType::type_num, Encoding::PLAIN, false, descr_,
                                    properties_->memory_pool());
     encoding_ = Encoding::PLAIN;
   }
 }
 
-template <typename Type>
-void TypedColumnWriter<Type>::WriteDictionaryPage() {
+template <typename DType>
+void TypedColumnWriterImpl<DType>::WriteDictionaryPage() {
   // We have to dynamic cast here because TypedEncoder<Type> as some compilers
   // don't want to cast through virtual inheritance
-  auto dict_encoder = dynamic_cast<DictEncoder<Type>*>(current_encoder_.get());
+  auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
   DCHECK(dict_encoder);
   std::shared_ptr<ResizableBuffer> buffer =
       AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
@@ -591,22 +758,8 @@ void TypedColumnWriter<Type>::WriteDictionaryPage() {
   total_bytes_written_ += pager_->WriteDictionaryPage(page);
 }
 
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
-  EncodedStatistics result;
-  if (page_statistics_) result = page_statistics_->Encode();
-  return result;
-}
-
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
-  EncodedStatistics result;
-  if (chunk_statistics_) result = chunk_statistics_->Encode();
-  return result;
-}
-
-template <typename Type>
-void TypedColumnWriter<Type>::ResetPageStatistics() {
+template <typename DType>
+void TypedColumnWriterImpl<DType>::ResetPageStatistics() {
   if (chunk_statistics_ != nullptr) {
     chunk_statistics_->Merge(*page_statistics_);
     page_statistics_->Reset();
@@ -614,58 +767,13 @@ void TypedColumnWriter<Type>::ResetPageStatistics() {
 }
 
 // ----------------------------------------------------------------------
-// Dynamic column writer constructor
-
-std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
-                                                 std::unique_ptr<PageWriter> pager,
-                                                 const WriterProperties* properties) {
-  const ColumnDescriptor* descr = metadata->descr();
-  const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
-                              descr->physical_type() != Type::BOOLEAN;
-  Encoding::type encoding = properties->encoding(descr->path());
-  if (use_dictionary) {
-    encoding = properties->dictionary_index_encoding();
-  }
-  switch (descr->physical_type()) {
-    case Type::BOOLEAN:
-      return std::make_shared<BoolWriter>(metadata, std::move(pager), use_dictionary,
-                                          encoding, properties);
-    case Type::INT32:
-      return std::make_shared<Int32Writer>(metadata, std::move(pager), use_dictionary,
-                                           encoding, properties);
-    case Type::INT64:
-      return std::make_shared<Int64Writer>(metadata, std::move(pager), use_dictionary,
-                                           encoding, properties);
-    case Type::INT96:
-      return std::make_shared<Int96Writer>(metadata, std::move(pager), use_dictionary,
-                                           encoding, properties);
-    case Type::FLOAT:
-      return std::make_shared<FloatWriter>(metadata, std::move(pager), use_dictionary,
-                                           encoding, properties);
-    case Type::DOUBLE:
-      return std::make_shared<DoubleWriter>(metadata, std::move(pager), use_dictionary,
-                                            encoding, properties);
-    case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), use_dictionary,
-                                               encoding, properties);
-    case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayWriter>(
-          metadata, std::move(pager), use_dictionary, encoding, properties);
-    default:
-      ParquetException::NYI("type reader not implemented");
-  }
-  // Unreachable code, but supress compiler warning
-  return std::shared_ptr<ColumnWriter>(nullptr);
-}
-
-// ----------------------------------------------------------------------
 // Instantiate templated classes
 
 template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
-                                                        const int16_t* def_levels,
-                                                        const int16_t* rep_levels,
-                                                        const T* values) {
+int64_t TypedColumnWriterImpl<DType>::WriteMiniBatch(int64_t num_values,
+                                                     const int16_t* def_levels,
+                                                     const int16_t* rep_levels,
+                                                     const T* values) {
   int64_t values_to_write = 0;
   // If the field is required and non-repeated, there are no definition levels
   if (descr_->max_definition_level() > 0) {
@@ -722,7 +830,7 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
 }
 
 template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(
+int64_t TypedColumnWriterImpl<DType>::WriteMiniBatchSpaced(
     int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
     const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values,
     int64_t* num_spaced_written) {
@@ -793,8 +901,10 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(
 }
 
 template <typename DType>
-void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
-                                          const int16_t* rep_levels, const T* values) {
+void TypedColumnWriterImpl<DType>::WriteBatch(int64_t num_values,
+                                              const int16_t* def_levels,
+                                              const int16_t* rep_levels,
+                                              const T* values) {
   // We check for DataPage limits only after we have inserted the values. If a user
   // writes a large number of values, the DataPage size can be much above the limit.
   // The purpose of this chunking is to bound this. Even if a user writes large number
@@ -817,7 +927,7 @@ void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def
 }
 
 template <typename DType>
-void TypedColumnWriter<DType>::WriteBatchSpaced(
+void TypedColumnWriterImpl<DType>::WriteBatchSpaced(
     int64_t num_values, const int16_t* def_levels, const int16_t* rep_levels,
     const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
   // We check for DataPage limits only after we have inserted the values. If a user
@@ -845,27 +955,63 @@ void TypedColumnWriter<DType>::WriteBatchSpaced(
 }
 
 template <typename DType>
-void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
+void TypedColumnWriterImpl<DType>::WriteValues(int64_t num_values, const T* values) {
   dynamic_cast<ValueEncoderType*>(current_encoder_.get())
       ->Put(values, static_cast<int>(num_values));
 }
 
 template <typename DType>
-void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
-                                                 const uint8_t* valid_bits,
-                                                 int64_t valid_bits_offset,
-                                                 const T* values) {
+void TypedColumnWriterImpl<DType>::WriteValuesSpaced(int64_t num_values,
+                                                     const uint8_t* valid_bits,
+                                                     int64_t valid_bits_offset,
+                                                     const T* values) {
   dynamic_cast<ValueEncoderType*>(current_encoder_.get())
       ->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
 }
 
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
+// ----------------------------------------------------------------------
+// Dynamic column writer constructor
+
+std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
+                                                 std::unique_ptr<PageWriter> pager,
+                                                 const WriterProperties* properties) {
+  const ColumnDescriptor* descr = metadata->descr();
+  const bool use_dictionary = properties->dictionary_enabled(descr->path()) &&
+                              descr->physical_type() != Type::BOOLEAN;
+  Encoding::type encoding = properties->encoding(descr->path());
+  if (use_dictionary) {
+    encoding = properties->dictionary_index_encoding();
+  }
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::make_shared<TypedColumnWriterImpl<BooleanType>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::INT32:
+      return std::make_shared<TypedColumnWriterImpl<Int32Type>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::INT64:
+      return std::make_shared<TypedColumnWriterImpl<Int64Type>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::INT96:
+      return std::make_shared<TypedColumnWriterImpl<Int96Type>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::FLOAT:
+      return std::make_shared<TypedColumnWriterImpl<FloatType>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::DOUBLE:
+      return std::make_shared<TypedColumnWriterImpl<DoubleType>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<TypedColumnWriterImpl<ByteArrayType>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<TypedColumnWriterImpl<FLBAType>>(
+          metadata, std::move(pager), use_dictionary, encoding, properties);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<ColumnWriter>(nullptr);
+}
 
 }  // namespace parquet
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 254bf0d..5b9efb4 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -105,147 +105,47 @@ class PARQUET_EXPORT PageWriter {
 static constexpr int WRITE_BATCH_SIZE = 1000;
 class PARQUET_EXPORT ColumnWriter {
  public:
-  ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
-               bool has_dictionary, Encoding::type encoding,
-               const WriterProperties* properties);
-
   virtual ~ColumnWriter() = default;
 
   static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
                                             std::unique_ptr<PageWriter>,
                                             const WriterProperties* properties);
 
-  Type::type type() const { return descr_->physical_type(); }
-
-  const ColumnDescriptor* descr() const { return descr_; }
-
-  /**
-   * Closes the ColumnWriter, commits any buffered values to pages.
-   *
-   * @return Total size of the column in bytes
-   */
-  int64_t Close();
-
-  int64_t rows_written() const { return rows_written_; }
-
-  // Only considers the size of the compressed pages + page header
-  // Some values might be still buffered an not written to a page yet
-  int64_t total_compressed_bytes() const { return total_compressed_bytes_; }
-
-  int64_t total_bytes_written() const { return total_bytes_written_; }
-
-  const WriterProperties* properties() { return properties_; }
-
- protected:
-  virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
-
-  // Serializes Dictionary Page if enabled
-  virtual void WriteDictionaryPage() = 0;
-
-  // Checks if the Dictionary Page size limit is reached
-  // If the limit is reached, the Dictionary and Data Pages are serialized
-  // The encoding is switched to PLAIN
-
-  virtual void CheckDictionarySizeLimit() = 0;
+  /// \brief Closes the ColumnWriter, commits any buffered values to pages.
+  /// \return Total size of the column in bytes
+  virtual int64_t Close() = 0;
 
-  // Plain-encoded statistics of the current page
-  virtual EncodedStatistics GetPageStatistics() = 0;
+  /// \brief The physical Parquet type of the column
+  virtual Type::type type() const = 0;
 
-  // Plain-encoded statistics of the whole chunk
-  virtual EncodedStatistics GetChunkStatistics() = 0;
-
-  // Merges page statistics into chunk statistics, then resets the values
-  virtual void ResetPageStatistics() = 0;
-
-  // Adds Data Pages to an in memory buffer in dictionary encoding mode
-  // Serializes the Data Pages in other encoding modes
-  void AddDataPage();
-
-  // Serializes Data Pages
-  void WriteDataPage(const CompressedDataPage& page);
-
-  // Write multiple definition levels
-  void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
-
-  // Write multiple repetition levels
-  void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
-
-  // RLE encode the src_buffer into dest_buffer and return the encoded size
-  int64_t RleEncodeLevels(const Buffer& src_buffer, ResizableBuffer* dest_buffer,
-                          int16_t max_level);
-
-  // Serialize the buffered Data Pages
-  void FlushBufferedDataPages();
-
-  ColumnChunkMetaDataBuilder* metadata_;
-  const ColumnDescriptor* descr_;
-
-  std::unique_ptr<PageWriter> pager_;
-
-  bool has_dictionary_;
-  Encoding::type encoding_;
-  const WriterProperties* properties_;
+  /// \brief The schema for the column
+  virtual const ColumnDescriptor* descr() const = 0;
 
-  LevelEncoder level_encoder_;
+  /// \brief The number of rows written so far
+  virtual int64_t rows_written() const = 0;
 
-  ::arrow::MemoryPool* allocator_;
+  /// \brief The total size of the compressed pages + page headers. Some values
+  /// might be still buffered an not written to a page yet
+  virtual int64_t total_compressed_bytes() const = 0;
 
-  // The total number of values stored in the data page. This is the maximum of
-  // the number of encoded definition levels or encoded values. For
-  // non-repeated, required columns, this is equal to the number of encoded
-  // values. For repeated or optional values, there may be fewer data values
-  // than levels, and this tells you how many encoded levels there are in that
-  // case.
-  int64_t num_buffered_values_;
+  /// \brief The total number of bytes written as serialized data and
+  /// dictionary pages to the ColumnChunk so far
+  virtual int64_t total_bytes_written() const = 0;
 
-  // The total number of stored values. For repeated or optional values, this
-  // number may be lower than num_buffered_values_.
-  int64_t num_buffered_encoded_values_;
-
-  // Total number of rows written with this ColumnWriter
-  int rows_written_;
-
-  // Records the total number of bytes written by the serializer
-  int64_t total_bytes_written_;
-
-  // Records the current number of compressed bytes in a column
-  int64_t total_compressed_bytes_;
-
-  // Flag to check if the Writer has been closed
-  bool closed_;
-
-  // Flag to infer if dictionary encoding has fallen back to PLAIN
-  bool fallback_;
-
-  std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
-  std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
-
-  std::shared_ptr<ResizableBuffer> definition_levels_rle_;
-  std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
-
-  std::shared_ptr<ResizableBuffer> uncompressed_data_;
-  std::shared_ptr<ResizableBuffer> compressed_data_;
-
-  std::vector<CompressedDataPage> data_pages_;
-
- private:
-  void InitSinks();
+  /// \brief The file-level writer properties
+  virtual const WriterProperties* properties() = 0;
 };
 
 // API to write values to a single column. This is the main client facing API.
 template <typename DType>
-class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter {
+class TypedColumnWriter : public ColumnWriter {
  public:
-  typedef typename DType::c_type T;
-
-  TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-                    std::unique_ptr<PageWriter> pager, const bool use_dictionary,
-                    Encoding::type encoding, const WriterProperties* properties);
+  using T = typename DType::c_type;
 
   // Write a batch of repetition levels, definition levels, and values to the
   // column.
-  void WriteBatch(int64_t num_values, const int16_t* def_levels,
-                  const int16_t* rep_levels, const T* values);
+  virtual void WriteBatch(int64_t num_values, const int16_t* def_levels,
+                          const int16_t* rep_levels, const T* values) = 0;
 
   /// Write a batch of repetition levels, definition levels, and values to the
   /// column.
@@ -273,63 +173,21 @@ class PARQUET_TEMPLATE_CLASS_EXPORT TypedColumnWriter : public ColumnWriter {
   /// @param values The values in the lowest nested level including
   ///   spacing for nulls on the lowest levels; input has the length
   ///   of the number of rows on the lowest nesting level.
-  void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
-                        const int16_t* rep_levels, const uint8_t* valid_bits,
-                        int64_t valid_bits_offset, const T* values);
+  virtual void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+                                const int16_t* rep_levels, const uint8_t* valid_bits,
+                                int64_t valid_bits_offset, const T* values) = 0;
 
   // Estimated size of the values that are not written to a page yet
-  int64_t EstimatedBufferedValueBytes() const {
-    return current_encoder_->EstimatedDataEncodedSize();
-  }
-
- protected:
-  std::shared_ptr<Buffer> GetValuesBuffer() override {
-    return current_encoder_->FlushValues();
-  }
-  void WriteDictionaryPage() override;
-  void CheckDictionarySizeLimit() override;
-  EncodedStatistics GetPageStatistics() override;
-  EncodedStatistics GetChunkStatistics() override;
-  void ResetPageStatistics() override;
-
- private:
-  int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
-                         const int16_t* rep_levels, const T* values);
-
-  int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
-                               const int16_t* rep_levels, const uint8_t* valid_bits,
-                               int64_t valid_bits_offset, const T* values,
-                               int64_t* num_spaced_written);
-
-  // Write values to a temporary buffer before they are encoded into pages
-  void WriteValues(int64_t num_values, const T* values);
-  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
-                         int64_t valid_bits_offset, const T* values);
-
-  using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
-  std::unique_ptr<Encoder> current_encoder_;
-
-  typedef TypedRowGroupStatistics<DType> TypedStats;
-  std::unique_ptr<TypedStats> page_statistics_;
-  std::unique_ptr<TypedStats> chunk_statistics_;
+  virtual int64_t EstimatedBufferedValueBytes() const = 0;
 };
 
-typedef TypedColumnWriter<BooleanType> BoolWriter;
-typedef TypedColumnWriter<Int32Type> Int32Writer;
-typedef TypedColumnWriter<Int64Type> Int64Writer;
-typedef TypedColumnWriter<Int96Type> Int96Writer;
-typedef TypedColumnWriter<FloatType> FloatWriter;
-typedef TypedColumnWriter<DoubleType> DoubleWriter;
-typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
-typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
-
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<BooleanType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int32Type>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int64Type>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<Int96Type>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FloatType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<DoubleType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<ByteArrayType>;
-PARQUET_EXTERN_TEMPLATE TypedColumnWriter<FLBAType>;
+using BoolWriter = TypedColumnWriter<BooleanType>;
+using Int32Writer = TypedColumnWriter<Int32Type>;
+using Int64Writer = TypedColumnWriter<Int64Type>;
+using Int96Writer = TypedColumnWriter<Int96Type>;
+using FloatWriter = TypedColumnWriter<FloatType>;
+using DoubleWriter = TypedColumnWriter<DoubleType>;
+using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
+using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
 
 }  // namespace parquet