You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/08/09 01:22:01 UTC

[arrow] branch master updated: ARROW-6152: [C++][Parquet] Add parquet::ColumnWriter::WriteArrow method, refactor

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new b4c1763  ARROW-6152: [C++][Parquet] Add parquet::ColumnWriter::WriteArrow method, refactor
b4c1763 is described below

commit b4c1763024f6aabb562fb0b83643a71a6e8be5a1
Author: Wes McKinney <we...@apache.org>
AuthorDate: Thu Aug 8 20:21:42 2019 -0500

    ARROW-6152: [C++][Parquet] Add parquet::ColumnWriter::WriteArrow method, refactor
    
    * Write Arrow arrays directly to ColumnWriter, to allow internal optimizations and other features, like direct DictionaryArray writes
    * Refactor and streamline implementation for maintainability and readability
    * Move Arrow reader/writer properties to parquet/properties.h
    * Some minor miscellaneous code reorganization to help
    
    Functionally the library is unchanged
    
    Closes #5036 from wesm/ARROW-6152 and squashes the following commits:
    
    b2ddcb3f3 <Wes McKinney> Regenerate arrowExports.cpp
    7a227f16f <Wes McKinney> Try to fix CI jobs, fix R use of deprecated API
    7bc795174 <Wes McKinney> Fix Python compilation
    0b0aaef82 <Wes McKinney> Simplify
    3331ff797 <Wes McKinney> IWYU
    5e5f00e8d <Wes McKinney> Restore functors, test suite passing again
    f832f8e19 <Wes McKinney> Compiles again, but lost a couple functors
    1dc38e02e <Wes McKinney> Checkpoint
    7bc4f9b15 <Wes McKinney> Refactoring
    31e180e63 <Wes McKinney> Some simplification, investigation
    
    Authored-by: Wes McKinney <we...@apache.org>
    Signed-off-by: Wes McKinney <we...@apache.org>
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc |   5 +-
 cpp/src/parquet/arrow/reader.cc                   |  10 -
 cpp/src/parquet/arrow/reader.h                    |  50 +-
 cpp/src/parquet/arrow/reader_internal.cc          |   4 +-
 cpp/src/parquet/arrow/reader_internal.h           |  17 +-
 cpp/src/parquet/arrow/schema.cc                   |  79 +-
 cpp/src/parquet/arrow/schema.h                    |  10 +-
 cpp/src/parquet/arrow/writer.cc                   | 841 +++-------------------
 cpp/src/parquet/arrow/writer.h                    | 174 +----
 cpp/src/parquet/column_writer.cc                  | 635 ++++++++++++++--
 cpp/src/parquet/column_writer.h                   |  62 ++
 cpp/src/parquet/properties.cc                     |  11 +
 cpp/src/parquet/properties.h                      | 179 ++++-
 cpp/src/parquet/schema.cc                         |   1 -
 cpp/src/parquet/types.cc                          |  76 ++
 cpp/src/parquet/types.h                           |   6 +
 dev/lint/run_iwyu.sh                              |   1 +
 python/pyarrow/_parquet.pxd                       |  40 +-
 r/src/arrowExports.cpp                            |  22 +-
 r/src/parquet.cpp                                 |  19 +-
 20 files changed, 1069 insertions(+), 1173 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index bb39d56..ee4ddcb 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -44,6 +44,7 @@
 #include "parquet/arrow/schema.h"
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
+#include "parquet/column_writer.h"
 #include "parquet/file_writer.h"
 #include "parquet/test-util.h"
 
@@ -459,7 +460,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
         case ::arrow::Type::DECIMAL: {
           const auto& decimal_type =
               static_cast<const ::arrow::Decimal128Type&>(values_type);
-          byte_width = DecimalSize(decimal_type.precision());
+          byte_width = internal::DecimalSize(decimal_type.precision());
         } break;
         default:
           break;
@@ -470,7 +471,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const DataType& type,
       break;
     case ::arrow::Type::DECIMAL: {
       const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
-      byte_width = DecimalSize(decimal_type.precision());
+      byte_width = internal::DecimalSize(decimal_type.precision());
     } break;
     default:
       break;
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 931fd19..3d1ad76 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -19,7 +19,6 @@
 
 #include <algorithm>
 #include <cstring>
-#include <functional>
 #include <future>
 #include <numeric>
 #include <utility>
@@ -33,13 +32,11 @@
 #include "arrow/util/thread-pool.h"
 
 #include "parquet/arrow/reader_internal.h"
-#include "parquet/arrow/schema.h"
 #include "parquet/column_reader.h"
 #include "parquet/exception.h"
 #include "parquet/file_reader.h"
 #include "parquet/metadata.h"
 #include "parquet/properties.h"
-#include "parquet/schema-internal.h"
 #include "parquet/schema.h"
 
 using arrow::Array;
@@ -76,8 +73,6 @@ using parquet::internal::RecordReader;
 namespace parquet {
 namespace arrow {
 
-class ColumnChunkReaderImpl;
-
 class ColumnReaderImpl : public ColumnReader {
  public:
   enum ReaderType { PRIMITIVE, LIST, STRUCT };
@@ -91,11 +86,6 @@ class ColumnReaderImpl : public ColumnReader {
   virtual ReaderType type() const = 0;
 };
 
-ArrowReaderProperties default_arrow_reader_properties() {
-  static ArrowReaderProperties default_reader_props;
-  return default_reader_props;
-}
-
 // ----------------------------------------------------------------------
 // FileReaderImpl forward declaration
 
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index d0ce68a..1492e2e 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -20,9 +20,9 @@
 
 #include <cstdint>
 #include <memory>
-#include <unordered_set>
 #include <vector>
 
+#include "parquet/file_reader.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 
@@ -39,8 +39,6 @@ class Table;
 namespace parquet {
 
 class FileMetaData;
-class ParquetFileReader;
-class ReaderProperties;
 class SchemaDescriptor;
 
 namespace arrow {
@@ -49,52 +47,6 @@ class ColumnChunkReader;
 class ColumnReader;
 class RowGroupReader;
 
-static constexpr bool DEFAULT_USE_THREADS = false;
-
-// Default number of rows to read when using ::arrow::RecordBatchReader
-static constexpr int64_t DEFAULT_BATCH_SIZE = 64 * 1024;
-
-/// EXPERIMENTAL: Properties for configuring FileReader behavior.
-class PARQUET_EXPORT ArrowReaderProperties {
- public:
-  explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS)
-      : use_threads_(use_threads),
-        read_dict_indices_(),
-        batch_size_(DEFAULT_BATCH_SIZE) {}
-
-  void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
-
-  bool use_threads() const { return use_threads_; }
-
-  void set_read_dictionary(int column_index, bool read_dict) {
-    if (read_dict) {
-      read_dict_indices_.insert(column_index);
-    } else {
-      read_dict_indices_.erase(column_index);
-    }
-  }
-  bool read_dictionary(int column_index) const {
-    if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
-
-  int64_t batch_size() const { return batch_size_; }
-
- private:
-  bool use_threads_;
-  std::unordered_set<int> read_dict_indices_;
-  int64_t batch_size_;
-};
-
-/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
-PARQUET_EXPORT
-ArrowReaderProperties default_arrow_reader_properties();
-
 // Arrow read adapter class for deserializing Parquet files as Arrow row
 // batches.
 //
diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc
index f41e875..bfc4094 100644
--- a/cpp/src/parquet/arrow/reader_internal.cc
+++ b/cpp/src/parquet/arrow/reader_internal.cc
@@ -24,7 +24,6 @@
 #include <memory>
 #include <string>
 #include <type_traits>
-#include <utility>
 #include <vector>
 
 #include <boost/algorithm/string/predicate.hpp>
@@ -32,8 +31,10 @@
 #include "arrow/array.h"
 #include "arrow/builder.h"
 #include "arrow/compute/kernel.h"
+#include "arrow/status.h"
 #include "arrow/table.h"
 #include "arrow/type.h"
+#include "arrow/type_traits.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/int-util.h"
 #include "arrow/util/logging.h"
@@ -42,6 +43,7 @@
 #include "parquet/arrow/reader.h"
 #include "parquet/column_reader.h"
 #include "parquet/platform.h"
+#include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
 
diff --git a/cpp/src/parquet/arrow/reader_internal.h b/cpp/src/parquet/arrow/reader_internal.h
index cbf44ec..4568e42 100644
--- a/cpp/src/parquet/arrow/reader_internal.h
+++ b/cpp/src/parquet/arrow/reader_internal.h
@@ -17,17 +17,19 @@
 
 #pragma once
 
+#include <cstdint>
 #include <deque>
+#include <functional>
 #include <memory>
 #include <unordered_map>
 #include <unordered_set>
+#include <utility>
 #include <vector>
 
-#include "arrow/status.h"
-
 #include "parquet/column_reader.h"
 #include "parquet/file_reader.h"
 #include "parquet/metadata.h"
+#include "parquet/platform.h"
 #include "parquet/schema.h"
 
 namespace arrow {
@@ -36,8 +38,6 @@ class Array;
 class ChunkedArray;
 class DataType;
 class Field;
-class MemoryPool;
-class Schema;
 
 }  // namespace arrow
 
@@ -45,17 +45,10 @@ using arrow::Status;
 
 namespace parquet {
 
-class ColumnDescriptor;
-
-namespace internal {
-
-class RecordReader;
-
-}  // namespace internal
+class ArrowReaderProperties;
 
 namespace arrow {
 
-class ArrowReaderProperties;
 class ColumnReaderImpl;
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 82c8566..49d0dd9 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -18,18 +18,13 @@
 #include "parquet/arrow/schema.h"
 
 #include <string>
-#include <unordered_set>
-#include <utility>
 #include <vector>
 
 #include "arrow/type.h"
 #include "arrow/util/checked_cast.h"
-#include "arrow/util/logging.h"
 
-#include "parquet/arrow/writer.h"
 #include "parquet/exception.h"
 #include "parquet/properties.h"
-#include "parquet/schema-internal.h"
 #include "parquet/types.h"
 
 using arrow::Field;
@@ -262,7 +257,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
           static_cast<const ::arrow::Decimal128Type&>(*field->type());
       precision = decimal_type.precision();
       scale = decimal_type.scale();
-      length = DecimalSize(precision);
+      length = internal::DecimalSize(precision);
       PARQUET_CATCH_NOT_OK(logical_type = LogicalType::Decimal(precision, scale));
     } break;
     case ArrowTypeId::DATE32:
@@ -351,77 +346,5 @@ Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
                          out);
 }
 
-/// \brief Compute the number of bytes required to represent a decimal of a
-/// given precision. Taken from the Apache Impala codebase. The comments next
-/// to the return values are the maximum value that can be represented in 2's
-/// complement with the returned number of bytes.
-int32_t DecimalSize(int32_t precision) {
-  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
-                          << precision;
-  DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
-                           << precision;
-
-  switch (precision) {
-    case 1:
-    case 2:
-      return 1;  // 127
-    case 3:
-    case 4:
-      return 2;  // 32,767
-    case 5:
-    case 6:
-      return 3;  // 8,388,607
-    case 7:
-    case 8:
-    case 9:
-      return 4;  // 2,147,483,427
-    case 10:
-    case 11:
-      return 5;  // 549,755,813,887
-    case 12:
-    case 13:
-    case 14:
-      return 6;  // 140,737,488,355,327
-    case 15:
-    case 16:
-      return 7;  // 36,028,797,018,963,967
-    case 17:
-    case 18:
-      return 8;  // 9,223,372,036,854,775,807
-    case 19:
-    case 20:
-    case 21:
-      return 9;  // 2,361,183,241,434,822,606,847
-    case 22:
-    case 23:
-      return 10;  // 604,462,909,807,314,587,353,087
-    case 24:
-    case 25:
-    case 26:
-      return 11;  // 154,742,504,910,672,534,362,390,527
-    case 27:
-    case 28:
-      return 12;  // 39,614,081,257,132,168,796,771,975,167
-    case 29:
-    case 30:
-    case 31:
-      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
-    case 32:
-    case 33:
-      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
-    case 34:
-    case 35:
-      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
-    case 36:
-    case 37:
-    case 38:
-      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
-    default:
-      break;
-  }
-  DCHECK(false);
-  return -1;
-}
-
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index b3cc66b..5ec9b0c 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -18,11 +18,8 @@
 #ifndef PARQUET_ARROW_SCHEMA_H
 #define PARQUET_ARROW_SCHEMA_H
 
-#include <cstdint>
 #include <memory>
-#include <vector>
 
-#include "parquet/metadata.h"
 #include "parquet/platform.h"
 #include "parquet/schema.h"
 
@@ -35,12 +32,11 @@ class Schema;
 
 namespace parquet {
 
+class ArrowWriterProperties;
 class WriterProperties;
 
 namespace arrow {
 
-class ArrowWriterProperties;
-
 PARQUET_EXPORT
 ::arrow::Status FieldToNode(const std::shared_ptr<::arrow::Field>& field,
                             const WriterProperties& properties,
@@ -58,11 +54,7 @@ PARQUET_EXPORT
                                 const WriterProperties& properties,
                                 std::shared_ptr<SchemaDescriptor>* out);
 
-PARQUET_EXPORT
-int32_t DecimalSize(int32_t precision);
-
 }  // namespace arrow
-
 }  // namespace parquet
 
 #endif  // PARQUET_ARROW_SCHEMA_H
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index ee3b880..fb437f1 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -18,7 +18,6 @@
 #include "parquet/arrow/writer.h"
 
 #include <algorithm>
-#include <cstddef>
 #include <deque>
 #include <type_traits>
 #include <utility>
@@ -28,12 +27,9 @@
 #include "arrow/buffer-builder.h"
 #include "arrow/compute/api.h"
 #include "arrow/table.h"
-#include "arrow/util/checked_cast.h"
+#include "arrow/type.h"
 #include "arrow/visitor_inline.h"
 
-#include "arrow/util/logging.h"
-
-#include "parquet/arrow/reader.h"
 #include "parquet/arrow/reader_internal.h"
 #include "parquet/arrow/schema.h"
 #include "parquet/column_writer.h"
@@ -70,12 +66,6 @@ using parquet::schema::GroupNode;
 namespace parquet {
 namespace arrow {
 
-std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
-  static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
-      ArrowWriterProperties::Builder().build();
-  return default_writer_properties;
-}
-
 namespace {
 
 class LevelBuilder {
@@ -293,31 +283,6 @@ Status LevelBuilder::VisitInline(const Array& array) {
   return VisitArrayInline(array, this);
 }
 
-struct ColumnWriterContext {
-  ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
-      : memory_pool(memory_pool), properties(properties) {
-    this->data_buffer = AllocateBuffer(memory_pool);
-    this->def_levels_buffer = AllocateBuffer(memory_pool);
-  }
-
-  template <typename T>
-  Status GetScratchData(const int64_t num_values, T** out) {
-    RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
-    *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
-    return Status::OK();
-  }
-
-  MemoryPool* memory_pool;
-  ArrowWriterProperties* properties;
-
-  // Buffer used for storing the data of an array converted to the physical type
-  // as expected by parquet-cpp.
-  std::shared_ptr<ResizableBuffer> data_buffer;
-
-  // We use the shared ownership of this buffer
-  std::shared_ptr<ResizableBuffer> def_levels_buffer;
-};
-
 Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
   if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
     if (type.num_children() != 1) {
@@ -332,7 +297,7 @@ Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type
 
 class ArrowColumnWriter {
  public:
-  ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
+  ArrowColumnWriter(ArrowWriteContext* ctx, ColumnWriter* column_writer,
                     const SchemaField* schema_field,
                     const SchemaManifest* schema_manifest)
       : ctx_(ctx),
@@ -340,7 +305,35 @@ class ArrowColumnWriter {
         schema_field_(schema_field),
         schema_manifest_(schema_manifest) {}
 
-  Status Write(const Array& data);
+  Status Write(const Array& data) {
+    if (data.length() == 0) {
+      // Write nothing when length is 0
+      return Status::OK();
+    }
+
+    ::arrow::Type::type values_type;
+    RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
+
+    std::shared_ptr<Array> _values_array;
+    int64_t values_offset = 0;
+    int64_t num_levels = 0;
+    int64_t num_values = 0;
+    LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
+    std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
+    RETURN_NOT_OK(level_builder.GenerateLevels(
+        data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
+        &def_levels_buffer, &rep_levels_buffer, &_values_array));
+    const int16_t* def_levels = nullptr;
+    if (def_levels_buffer) {
+      def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
+    }
+    const int16_t* rep_levels = nullptr;
+    if (rep_levels_buffer) {
+      rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
+    }
+    std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
+    return writer_->WriteArrow(def_levels, rep_levels, num_levels, *values_array, ctx_);
+  }
 
   Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
     if (data.length() == 0) {
@@ -394,646 +387,24 @@ class ArrowColumnWriter {
   }
 
  private:
-  template <typename ParquetType, typename ArrowType>
-  Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels,
-                         const int16_t* rep_levels);
-
-  Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
-                         const int16_t* rep_levels);
-
-  Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
-                               const int16_t* def_levels, const int16_t* rep_levels,
-                               const ArrowWriterProperties& properties);
-
-  template <typename ParquetType, typename ArrowType>
-  Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
-                               int64_t num_levels, const int16_t* def_levels,
-                               const int16_t* rep_levels,
-                               const typename ArrowType::c_type* values);
-
-  template <typename ParquetType, typename ArrowType>
-  Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels,
-                            const int16_t* def_levels, const int16_t* rep_levels,
-                            const uint8_t* valid_bits, int64_t valid_bits_offset,
-                            const typename ArrowType::c_type* values);
-
-  template <typename ParquetType>
-  Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
-                    const int16_t* rep_levels,
-                    const typename ParquetType::c_type* values) {
-    auto typed_writer =
-        ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
-    // WriteBatch was called with type mismatching the writer_'s type. This
-    // could be a schema conversion problem.
-    DCHECK(typed_writer);
-    PARQUET_CATCH_NOT_OK(
-        typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
-    return Status::OK();
-  }
-
-  template <typename ParquetType>
-  Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
-                          const int16_t* rep_levels, const uint8_t* valid_bits,
-                          int64_t valid_bits_offset,
-                          const typename ParquetType::c_type* values) {
-    auto typed_writer =
-        ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
-    // WriteBatchSpaced was called with type mismatching the writer_'s type. This
-    // could be a schema conversion problem.
-    DCHECK(typed_writer);
-    PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
-        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
-    return Status::OK();
-  }
-
-  ColumnWriterContext* ctx_;
+  ArrowWriteContext* ctx_;
   ColumnWriter* writer_;
   const SchemaField* schema_field_;
   const SchemaManifest* schema_manifest_;
 };
 
-template <typename ParquetType, typename ArrowType>
-Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels,
-                                          const int16_t* def_levels,
-                                          const int16_t* rep_levels) {
-  using ArrowCType = typename ArrowType::c_type;
-
-  const auto& data = static_cast<const PrimitiveArray&>(array);
-  const ArrowCType* values = nullptr;
-  // The values buffer may be null if the array is empty (ARROW-2744)
-  if (data.values() != nullptr) {
-    values = reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();
-  } else {
-    DCHECK_EQ(data.length(), 0);
-  }
-
-  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
-    // no nulls, just dump the data
-    RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
-        static_cast<const ArrowType&>(*array.type()), array.length(), num_levels,
-        def_levels, rep_levels, values)));
-  } else {
-    const uint8_t* valid_bits = data.null_bitmap_data();
-    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
-        static_cast<const ArrowType&>(*array.type()), data.length(), num_levels,
-        def_levels, rep_levels, valid_bits, data.offset(), values)));
-  }
-  return Status::OK();
-}
-
-template <typename ParquetType, typename ArrowType>
-Status ArrowColumnWriter::WriteNonNullableBatch(
-    const ArrowType& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels,
-    const typename ArrowType::c_type* values) {
-  using ParquetCType = typename ParquetType::c_type;
-  ParquetCType* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
-
-  std::copy(values, values + num_values, buffer);
-
-  return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
-    const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
-  int32_t* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
-
-  for (int i = 0; i < num_values; i++) {
-    buffer[i] = static_cast<int32_t>(values[i] / 86400000);
-  }
-
-  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
-    const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) {
-  int32_t* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
-  if (type.unit() == TimeUnit::SECOND) {
-    for (int i = 0; i < num_values; i++) {
-      buffer[i] = values[i] * 1000;
-    }
-  } else {
-    std::copy(values, values + num_values, buffer);
-  }
-  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
-}
-
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                 \
-  template <>                                                                      \
-  Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>(         \
-      const ArrowType& type, int64_t num_values, int64_t num_levels,               \
-      const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \
-    return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);    \
-  }
-
-NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
-NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-
-template <typename ParquetType, typename ArrowType>
-Status ArrowColumnWriter::WriteNullableBatch(
-    const ArrowType& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
-  using ParquetCType = typename ParquetType::c_type;
-
-  ParquetCType* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
-  for (int i = 0; i < num_values; i++) {
-    buffer[i] = static_cast<ParquetCType>(values[i]);
-  }
-
-  return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits,
-                                       valid_bits_offset, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
-    const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const int64_t* values) {
-  int32_t* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
-
-  for (int i = 0; i < num_values; i++) {
-    // Convert from milliseconds into days since the epoch
-    buffer[i] = static_cast<int32_t>(values[i] / 86400000);
-  }
-
-  return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
-                                     valid_bits_offset, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
-    const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const int32_t* values) {
-  int32_t* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
-
-  if (type.unit() == TimeUnit::SECOND) {
-    for (int i = 0; i < num_values; i++) {
-      buffer[i] = values[i] * 1000;
-    }
-  } else {
-    for (int i = 0; i < num_values; i++) {
-      buffer[i] = values[i];
-    }
-  }
-  return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
-                                     valid_bits_offset, buffer);
-}
-
-#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                          \
-  template <>                                                                            \
-  Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>(                  \
-      const ArrowType& type, int64_t num_values, int64_t num_levels,                     \
-      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,   \
-      int64_t valid_bits_offset, const CType* values) {                                  \
-    return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \
-                                         valid_bits_offset, values);                     \
-  }
-
-NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
-NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
-
-#define CONV_CASE_LOOP(ConversionFunction) \
-  for (int64_t i = 0; i < num_values; i++) \
-    ConversionFunction(arrow_values[i], &output[i]);
-
-static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values,
-                                                int64_t num_values,
-                                                ::arrow::TimeUnit ::type unit_type,
-                                                Int96* output) {
-  switch (unit_type) {
-    case TimeUnit::NANO:
-      CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp);
-      break;
-    case TimeUnit::MICRO:
-      CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp);
-      break;
-    case TimeUnit::MILLI:
-      CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp);
-      break;
-    case TimeUnit::SECOND:
-      CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp);
-      break;
-  }
-}
-
-#undef CONV_CASE_LOOP
-
-template <>
-Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
-    const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const int64_t* values) {
-  Int96* buffer = nullptr;
-  RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
-
-  ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
-
-  return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
-                                     valid_bits_offset, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
-    const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
-  Int96* buffer = nullptr;
-  RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
-
-  ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
-
-  return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
-}
-
-Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
-                                          const int16_t* def_levels,
-                                          const int16_t* rep_levels) {
-  const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
-
-  if (ctx_->properties->support_deprecated_int96_timestamps()) {
-    // User explicitly requested Int96 timestamps
-    return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
-                                                              def_levels, rep_levels);
-  } else if (ctx_->properties->coerce_timestamps_enabled()) {
-    // User explicitly requested coercion to specific unit
-    if (source_type.unit() == ctx_->properties->coerce_timestamps_unit()) {
-      // No data conversion necessary
-      return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
-                                                                def_levels, rep_levels);
-    } else {
-      return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels,
-                                   *(ctx_->properties));
-    }
-  } else if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0 &&
-             source_type.unit() == TimeUnit::NANO) {
-    // Absent superseding user instructions, when writing Parquet version 1.0 files,
-    // timestamps in nanoseconds are coerced to microseconds
-    std::shared_ptr<ArrowWriterProperties> properties =
-        (ArrowWriterProperties::Builder())
-            .coerce_timestamps(TimeUnit::MICRO)
-            ->disallow_truncated_timestamps()
-            ->build();
-    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
-  } else if (source_type.unit() == TimeUnit::SECOND) {
-    // Absent superseding user instructions, timestamps in seconds are coerced to
-    // milliseconds
-    std::shared_ptr<ArrowWriterProperties> properties =
-        (ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build();
-    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
-  } else {
-    // No data conversion necessary
-    return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
-                                                              def_levels, rep_levels);
-  }
-}
-
-#define COERCE_DIVIDE -1
-#define COERCE_INVALID 0
-#define COERCE_MULTIPLY +1
-
-static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
-    // from seconds ...
-    {{COERCE_INVALID, 0},                      // ... to seconds
-     {COERCE_MULTIPLY, 1000},                  // ... to millis
-     {COERCE_MULTIPLY, 1000000},               // ... to micros
-     {COERCE_MULTIPLY, INT64_C(1000000000)}},  // ... to nanos
-    // from millis ...
-    {{COERCE_INVALID, 0},
-     {COERCE_MULTIPLY, 1},
-     {COERCE_MULTIPLY, 1000},
-     {COERCE_MULTIPLY, 1000000}},
-    // from micros ...
-    {{COERCE_INVALID, 0},
-     {COERCE_DIVIDE, 1000},
-     {COERCE_MULTIPLY, 1},
-     {COERCE_MULTIPLY, 1000}},
-    // from nanos ...
-    {{COERCE_INVALID, 0},
-     {COERCE_DIVIDE, 1000000},
-     {COERCE_DIVIDE, 1000},
-     {COERCE_MULTIPLY, 1}}};
-
-Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
-                                                const int16_t* def_levels,
-                                                const int16_t* rep_levels,
-                                                const ArrowWriterProperties& properties) {
-  int64_t* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
-
-  const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
-  auto values = data.raw_values();
-
-  const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
-  auto source_unit = source_type.unit();
-
-  TimeUnit::type target_unit = properties.coerce_timestamps_unit();
-  auto target_type = ::arrow::timestamp(target_unit);
-  bool truncation_allowed = properties.truncated_timestamps_allowed();
-
-  auto DivideBy = [&](const int64_t factor) {
-    for (int64_t i = 0; i < array.length(); i++) {
-      if (!truncation_allowed && !data.IsNull(i) && (values[i] % factor != 0)) {
-        return Status::Invalid("Casting from ", source_type.ToString(), " to ",
-                               target_type->ToString(), " would lose data: ", values[i]);
-      }
-      buffer[i] = values[i] / factor;
-    }
-    return Status::OK();
-  };
-
-  auto MultiplyBy = [&](const int64_t factor) {
-    for (int64_t i = 0; i < array.length(); i++) {
-      buffer[i] = values[i] * factor;
-    }
-    return Status::OK();
-  };
-
-  const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
-                                                  [static_cast<int>(target_unit)];
-  // .first -> coercion operation; .second -> scale factor
-  DCHECK_NE(coercion.first, COERCE_INVALID);
-  RETURN_NOT_OK(coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
-                                                : MultiplyBy(coercion.second));
-
-  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
-    // no nulls, just dump the data
-    RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
-        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
-        num_levels, def_levels, rep_levels, buffer)));
-  } else {
-    const uint8_t* valid_bits = data.null_bitmap_data();
-    RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
-        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
-        num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
-  }
-
-  return Status::OK();
-}
-
-#undef COERCE_DIVIDE
-#undef COERCE_INVALID
-#undef COERCE_MULTIPLY
-
-// This specialization seems quite similar but it significantly differs in two points:
-// * offset is added at the most latest time to the pointer as we have sub-byte access
-// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
-//   ArrowType::c_type to ParquetType::c_type
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
-    const Array& array, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels) {
-  bool* buffer = nullptr;
-  RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));
-
-  const auto& data = static_cast<const BooleanArray&>(array);
-  const uint8_t* values = nullptr;
-  // The values buffer may be null if the array is empty (ARROW-2744)
-  if (data.values() != nullptr) {
-    values = reinterpret_cast<const uint8_t*>(data.values()->data());
-  } else {
-    DCHECK_EQ(data.length(), 0);
-  }
-
-  int buffer_idx = 0;
-  int64_t offset = array.offset();
-  for (int i = 0; i < data.length(); i++) {
-    if (!data.IsNull(i)) {
-      buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
-    }
-  }
-
-  return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
-    const Array& array, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels) {
-  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
-    const Array& array, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels) {
-  ByteArray* buffer = nullptr;
-  RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));
-
-  const auto& data = static_cast<const BinaryArray&>(array);
-
-  // In the case of an array consisting of only empty strings or all null,
-  // data.data() points already to a nullptr, thus data.data()->data() will
-  // segfault.
-  const uint8_t* values = nullptr;
-  if (data.value_data()) {
-    values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
-    DCHECK(values != nullptr);
-  }
-
-  // Slice offset is accounted for in raw_value_offsets
-  const int32_t* value_offset = data.raw_value_offsets();
-
-  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
-    // no nulls, just dump the data
-    for (int64_t i = 0; i < data.length(); i++) {
-      buffer[i] =
-          ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
-    }
-  } else {
-    int buffer_idx = 0;
-    for (int64_t i = 0; i < data.length(); i++) {
-      if (!data.IsNull(i)) {
-        buffer[buffer_idx++] =
-            ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
-      }
-    }
-  }
-
-  return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
-    const Array& array, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels) {
-  const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
-  const int64_t length = data.length();
-
-  FLBA* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
-
-  if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) {
-    // no nulls, just dump the data
-    // todo(advancedxy): use a writeBatch to avoid this step
-    for (int64_t i = 0; i < length; i++) {
-      buffer[i] = FixedLenByteArray(data.GetValue(i));
-    }
-  } else {
-    int buffer_idx = 0;
-    for (int64_t i = 0; i < length; i++) {
-      if (!data.IsNull(i)) {
-        buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
-      }
-    }
-  }
-
-  return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-template <>
-Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
-    const Array& array, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels) {
-  const auto& data = static_cast<const Decimal128Array&>(array);
-  const int64_t length = data.length();
-
-  FLBA* buffer;
-  RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
-
-  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
-  const int32_t offset =
-      decimal_type.byte_width() - DecimalSize(decimal_type.precision());
-
-  const bool does_not_have_nulls =
-      writer_->descr()->schema_node()->is_required() || data.null_count() == 0;
-
-  const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2;
-  std::vector<uint64_t> big_endian_values(valid_value_count);
-
-  // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
-  // don't have to keep writing two loops to handle the case where we know there are no
-  // nulls
-  if (does_not_have_nulls) {
-    // no nulls, just dump the data
-    // todo(advancedxy): use a writeBatch to avoid this step
-    for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
-      auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
-      big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
-      big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
-      buffer[i] = FixedLenByteArray(
-          reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
-    }
-  } else {
-    for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
-      if (!data.IsNull(i)) {
-        auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
-        big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
-        big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
-        buffer[buffer_idx++] = FixedLenByteArray(
-            reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
-        j += 2;
-      }
-    }
-  }
-
-  return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
-}
-
-Status ArrowColumnWriter::Write(const Array& data) {
-  if (data.length() == 0) {
-    // Write nothing when length is 0
-    return Status::OK();
-  }
-
-  ::arrow::Type::type values_type;
-  RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
-
-  std::shared_ptr<Array> _values_array;
-  int64_t values_offset = 0;
-  int64_t num_levels = 0;
-  int64_t num_values = 0;
-  LevelBuilder level_builder(ctx_->memory_pool, schema_field_, schema_manifest_);
-  std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
-  RETURN_NOT_OK(level_builder.GenerateLevels(
-      data, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
-      &def_levels_buffer, &rep_levels_buffer, &_values_array));
-  const int16_t* def_levels = nullptr;
-  if (def_levels_buffer) {
-    def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
-  }
-  const int16_t* rep_levels = nullptr;
-  if (rep_levels_buffer) {
-    rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
-  }
-  std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
-
-#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)                            \
-  case ::arrow::Type::ArrowEnum:                                                       \
-    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \
-                                                            def_levels, rep_levels);
-
-  switch (values_type) {
-    case ::arrow::Type::UINT32: {
-      if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
-        // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
-        // to use the larger Int64Type to store them lossless.
-        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels,
-                                                               def_levels, rep_levels);
-      } else {
-        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels,
-                                                               def_levels, rep_levels);
-      }
-    }
-      WRITE_BATCH_CASE(NA, NullType, Int32Type)
-    case ::arrow::Type::TIMESTAMP:
-      return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels);
-      WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
-      WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
-      WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
-      WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
-      WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
-      WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
-      WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
-      WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
-      WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
-      WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
-      WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
-      WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
-      WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
-      WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
-      WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
-      WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
-      WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
-      WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
-    default:
-      break;
-  }
-  return Status::NotImplemented("Data type not supported as list value: ",
-                                values_array->type()->ToString());
-}
-
 }  // namespace
 
 // ----------------------------------------------------------------------
 // FileWriter implementation
 
-class FileWriter::Impl {
+class FileWriterImpl : public FileWriter {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
-       const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
-      : writer_(std::move(writer)),
+  FileWriterImpl(const std::shared_ptr<::arrow::Schema>& schema, MemoryPool* pool,
+                 std::unique_ptr<ParquetFileWriter> writer,
+                 const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+      : schema_(schema),
+        writer_(std::move(writer)),
         row_group_writer_(nullptr),
         column_write_context_(pool, arrow_properties.get()),
         arrow_properties_(arrow_properties),
@@ -1044,7 +415,7 @@ class FileWriter::Impl {
                                &schema_manifest_);
   }
 
-  Status NewRowGroup(int64_t chunk_size) {
+  Status NewRowGroup(int64_t chunk_size) override {
     if (row_group_writer_ != nullptr) {
       PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
     }
@@ -1052,7 +423,7 @@ class FileWriter::Impl {
     return Status::OK();
   }
 
-  Status Close() {
+  Status Close() override {
     if (!closed_) {
       // Make idempotent
       closed_ = true;
@@ -1064,7 +435,7 @@ class FileWriter::Impl {
     return Status::OK();
   }
 
-  Status WriteColumnChunk(const Array& data) {
+  Status WriteColumnChunk(const Array& data) override {
     // A bit awkward here since cannot instantiate ChunkedArray from const Array&
     ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
     auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
@@ -1072,7 +443,7 @@ class FileWriter::Impl {
   }
 
   Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
-                          const int64_t size) {
+                          int64_t size) override {
     // DictionaryArrays are not yet handled with a fast path. To still support
     // writing them as a workaround, we convert them back to their non-dictionary
     // representation.
@@ -1107,51 +478,70 @@ class FileWriter::Impl {
     return arrow_writer.Close();
   }
 
-  const WriterProperties& properties() const { return *writer_->properties(); }
+  Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) override {
+    return WriteColumnChunk(data, 0, data->length());
+  }
+
+  Status WriteTable(const Table& table, int64_t chunk_size) override {
+    RETURN_NOT_OK(table.Validate());
+
+    if (chunk_size <= 0 && table.num_rows() > 0) {
+      return Status::Invalid("chunk size per row_group must be greater than 0");
+    } else if (!table.schema()->Equals(*schema_, false)) {
+      return Status::Invalid("table schema does not match this writer's. table:'",
+                             table.schema()->ToString(), "' this:'", schema_->ToString(),
+                             "'");
+    } else if (chunk_size > this->properties().max_row_group_length()) {
+      chunk_size = this->properties().max_row_group_length();
+    }
+
+    auto WriteRowGroup = [&](int64_t offset, int64_t size) {
+      RETURN_NOT_OK(NewRowGroup(size));
+      for (int i = 0; i < table.num_columns(); i++) {
+        RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
+      }
+      return Status::OK();
+    };
+
+    if (table.num_rows() == 0) {
+      // Append a row group with 0 rows
+      RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
+      return Status::OK();
+    }
+
+    for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
+      int64_t offset = chunk * chunk_size;
+      RETURN_NOT_OK_ELSE(
+          WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
+          PARQUET_IGNORE_NOT_OK(Close()));
+    }
+    return Status::OK();
+  }
 
-  ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; }
+  const WriterProperties& properties() const { return *writer_->properties(); }
 
-  virtual ~Impl() {}
+  ::arrow::MemoryPool* memory_pool() const override {
+    return column_write_context_.memory_pool;
+  }
 
-  const std::shared_ptr<FileMetaData> metadata() const { return writer_->metadata(); }
+  const std::shared_ptr<FileMetaData> metadata() const override {
+    return writer_->metadata();
+  }
 
  private:
   friend class FileWriter;
 
+  std::shared_ptr<::arrow::Schema> schema_;
+
   SchemaManifest schema_manifest_;
 
   std::unique_ptr<ParquetFileWriter> writer_;
   RowGroupWriter* row_group_writer_;
-  ColumnWriterContext column_write_context_;
+  ArrowWriteContext column_write_context_;
   std::shared_ptr<ArrowWriterProperties> arrow_properties_;
   bool closed_;
 };
 
-Status FileWriter::NewRowGroup(int64_t chunk_size) {
-  return impl_->NewRowGroup(chunk_size);
-}
-
-Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
-  return impl_->WriteColumnChunk(data);
-}
-
-Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
-                                    const int64_t offset, const int64_t size) {
-  return impl_->WriteColumnChunk(data, offset, size);
-}
-
-Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) {
-  return WriteColumnChunk(data, 0, data->length());
-}
-
-Status FileWriter::Close() { return impl_->Close(); }
-
-MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
-
-const std::shared_ptr<FileMetaData> FileWriter::metadata() const {
-  return impl_->metadata();
-}
-
 FileWriter::~FileWriter() {}
 
 Status FileWriter::Make(::arrow::MemoryPool* pool,
@@ -1159,16 +549,13 @@ Status FileWriter::Make(::arrow::MemoryPool* pool,
                         const std::shared_ptr<::arrow::Schema>& schema,
                         const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
                         std::unique_ptr<FileWriter>* out) {
-  out->reset(new FileWriter(pool, std::move(writer), schema, arrow_properties));
-  return (*out)->impl_->Init();
+  std::unique_ptr<FileWriterImpl> impl(
+      new FileWriterImpl(schema, pool, std::move(writer), arrow_properties));
+  RETURN_NOT_OK(impl->Init());
+  *out = std::move(impl);
+  return Status::OK();
 }
 
-FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
-                       const std::shared_ptr<::arrow::Schema>& schema,
-                       const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
-    : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)),
-      schema_(schema) {}
-
 Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
                         const std::shared_ptr<::arrow::io::OutputStream>& sink,
                         const std::shared_ptr<WriterProperties>& properties,
@@ -1206,42 +593,6 @@ Status WriteMetaDataFile(const FileMetaData& file_metadata,
   return Status::OK();
 }
 
-Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
-  RETURN_NOT_OK(table.Validate());
-
-  if (chunk_size <= 0 && table.num_rows() > 0) {
-    return Status::Invalid("chunk size per row_group must be greater than 0");
-  } else if (!table.schema()->Equals(*schema_, false)) {
-    return Status::Invalid("table schema does not match this writer's. table:'",
-                           table.schema()->ToString(), "' this:'", schema_->ToString(),
-                           "'");
-  } else if (chunk_size > impl_->properties().max_row_group_length()) {
-    chunk_size = impl_->properties().max_row_group_length();
-  }
-
-  auto WriteRowGroup = [&](int64_t offset, int64_t size) {
-    RETURN_NOT_OK(NewRowGroup(size));
-    for (int i = 0; i < table.num_columns(); i++) {
-      RETURN_NOT_OK(WriteColumnChunk(table.column(i), offset, size));
-    }
-    return Status::OK();
-  };
-
-  if (table.num_rows() == 0) {
-    // Append a row group with 0 rows
-    RETURN_NOT_OK_ELSE(WriteRowGroup(0, 0), PARQUET_IGNORE_NOT_OK(Close()));
-    return Status::OK();
-  }
-
-  for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
-    int64_t offset = chunk * chunk_size;
-    RETURN_NOT_OK_ELSE(
-        WriteRowGroup(offset, std::min(chunk_size, table.num_rows() - offset)),
-        PARQUET_IGNORE_NOT_OK(Close()));
-  }
-  return Status::OK();
-}
-
 Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
                   const std::shared_ptr<::arrow::io::OutputStream>& sink,
                   int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties,
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 8f7d499..b6c0c58 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -19,19 +19,16 @@
 #define PARQUET_ARROW_WRITER_H
 
 #include <cstdint>
-#include <cstring>
 #include <memory>
 
 #include "parquet/platform.h"
 #include "parquet/properties.h"
-#include "parquet/types.h"
-
-#include "arrow/type.h"
 
 namespace arrow {
 
 class Array;
 class ChunkedArray;
+class Schema;
 class Table;
 
 }  // namespace arrow
@@ -43,84 +40,6 @@ class ParquetFileWriter;
 
 namespace arrow {
 
-class PARQUET_EXPORT ArrowWriterProperties {
- public:
-  class Builder {
-   public:
-    Builder()
-        : write_timestamps_as_int96_(false),
-          coerce_timestamps_enabled_(false),
-          coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
-          truncated_timestamps_allowed_(false) {}
-    virtual ~Builder() {}
-
-    Builder* disable_deprecated_int96_timestamps() {
-      write_timestamps_as_int96_ = false;
-      return this;
-    }
-
-    Builder* enable_deprecated_int96_timestamps() {
-      write_timestamps_as_int96_ = true;
-      return this;
-    }
-
-    Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
-      coerce_timestamps_enabled_ = true;
-      coerce_timestamps_unit_ = unit;
-      return this;
-    }
-
-    Builder* allow_truncated_timestamps() {
-      truncated_timestamps_allowed_ = true;
-      return this;
-    }
-
-    Builder* disallow_truncated_timestamps() {
-      truncated_timestamps_allowed_ = false;
-      return this;
-    }
-
-    std::shared_ptr<ArrowWriterProperties> build() {
-      return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
-          write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
-          truncated_timestamps_allowed_));
-    }
-
-   private:
-    bool write_timestamps_as_int96_;
-
-    bool coerce_timestamps_enabled_;
-    ::arrow::TimeUnit::type coerce_timestamps_unit_;
-    bool truncated_timestamps_allowed_;
-  };
-
-  bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
-
-  bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
-  ::arrow::TimeUnit::type coerce_timestamps_unit() const {
-    return coerce_timestamps_unit_;
-  }
-
-  bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
-
- private:
-  explicit ArrowWriterProperties(bool write_nanos_as_int96,
-                                 bool coerce_timestamps_enabled,
-                                 ::arrow::TimeUnit::type coerce_timestamps_unit,
-                                 bool truncated_timestamps_allowed)
-      : write_timestamps_as_int96_(write_nanos_as_int96),
-        coerce_timestamps_enabled_(coerce_timestamps_enabled),
-        coerce_timestamps_unit_(coerce_timestamps_unit),
-        truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
-
-  const bool write_timestamps_as_int96_;
-  const bool coerce_timestamps_enabled_;
-  const ::arrow::TimeUnit::type coerce_timestamps_unit_;
-  const bool truncated_timestamps_allowed_;
-};
-
-std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();
-
 /**
  * Iterative API:
  *  Start a new RowGroup/Chunk with NewRowGroup
@@ -129,49 +48,41 @@ std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_prope
 class PARQUET_EXPORT FileWriter {
  public:
   static ::arrow::Status Make(
-      ::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+      MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
       const std::shared_ptr<::arrow::Schema>& schema,
       const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
       std::unique_ptr<FileWriter>* out);
 
-  static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+  static ::arrow::Status Open(const ::arrow::Schema& schema, MemoryPool* pool,
                               const std::shared_ptr<::arrow::io::OutputStream>& sink,
                               const std::shared_ptr<WriterProperties>& properties,
                               std::unique_ptr<FileWriter>* writer);
 
   static ::arrow::Status Open(
-      const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+      const ::arrow::Schema& schema, MemoryPool* pool,
       const std::shared_ptr<::arrow::io::OutputStream>& sink,
       const std::shared_ptr<WriterProperties>& properties,
       const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
       std::unique_ptr<FileWriter>* writer);
 
   /// \brief Write a Table to Parquet.
-  ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
+  virtual ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size) = 0;
 
-  ::arrow::Status NewRowGroup(int64_t chunk_size);
-  ::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
+  virtual ::arrow::Status NewRowGroup(int64_t chunk_size) = 0;
+  virtual ::arrow::Status WriteColumnChunk(const ::arrow::Array& data) = 0;
 
   /// \brief Write ColumnChunk in row group using slice of a ChunkedArray
-  ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
-                                   const int64_t offset, const int64_t size);
-  ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data);
-  ::arrow::Status Close();
+  virtual ::arrow::Status WriteColumnChunk(
+      const std::shared_ptr<::arrow::ChunkedArray>& data, int64_t offset,
+      int64_t size) = 0;
 
+  virtual ::arrow::Status WriteColumnChunk(
+      const std::shared_ptr<::arrow::ChunkedArray>& data) = 0;
+  virtual ::arrow::Status Close() = 0;
   virtual ~FileWriter();
 
-  ::arrow::MemoryPool* memory_pool() const;
-
-  const std::shared_ptr<FileMetaData> metadata() const;
-
- private:
-  FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
-             const std::shared_ptr<::arrow::Schema>& schema,
-             const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
-
-  class PARQUET_NO_EXPORT Impl;
-  std::unique_ptr<Impl> impl_;
-  std::shared_ptr<::arrow::Schema> schema_;
+  virtual MemoryPool* memory_pool() const = 0;
+  virtual const std::shared_ptr<FileMetaData> metadata() const = 0;
 };
 
 /// \brief Write Parquet file metadata only to indicated Arrow OutputStream
@@ -190,66 +101,13 @@ PARQUET_EXPORT
  * The table shall only consist of columns of primitive type or of primitive lists.
  */
 ::arrow::Status PARQUET_EXPORT WriteTable(
-    const ::arrow::Table& table, ::arrow::MemoryPool* pool,
+    const ::arrow::Table& table, MemoryPool* pool,
     const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
     const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
         default_arrow_writer_properties());
 
-namespace internal {
-
-/**
- * Timestamp conversion constants
- */
-constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
-
-template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
-inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
-  int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
-  (*impala_timestamp).value[2] = (uint32_t)julian_days;
-
-  int64_t last_day_units = time % UnitPerDay;
-  auto last_day_nanos = last_day_units * NanosecondsPerUnit;
-  // impala_timestamp will be unaligned every other entry so do memcpy instead
-  // of assign and reinterpret cast to avoid undefined behavior.
-  std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
-}
-
-constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
-
-inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
-  ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
-                                                                   impala_timestamp);
-}
-
-constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
-
-inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
-                                          Int96* impala_timestamp) {
-  ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
-      milliseconds, impala_timestamp);
-}
-
-constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
-
-inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
-                                          Int96* impala_timestamp) {
-  ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
-      microseconds, impala_timestamp);
-}
-
-constexpr int64_t kNanosecondsInNanos = INT64_C(1);
-
-inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
-                                         Int96* impala_timestamp) {
-  ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
-      nanoseconds, impala_timestamp);
-}
-
-}  // namespace internal
-
 }  // namespace arrow
-
 }  // namespace parquet
 
 #endif  // PARQUET_ARROW_WRITER_H
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 86e21c2..fa16234 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -17,14 +17,17 @@
 
 #include "parquet/column_writer.h"
 
+#include <algorithm>
 #include <cstdint>
 #include <cstring>
 #include <memory>
 #include <utility>
 #include <vector>
 
+#include "arrow/array.h"
 #include "arrow/buffer-builder.h"
-#include "arrow/memory_pool.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
 #include "arrow/util/bit-stream-utils.h"
 #include "arrow/util/checked_cast.h"
 #include "arrow/util/compression.h"
@@ -43,6 +46,7 @@
 
 namespace parquet {
 
+using ::arrow::Status;
 using ::arrow::internal::checked_cast;
 
 using BitWriter = ::arrow::BitUtil::BitWriter;
@@ -131,7 +135,7 @@ class SerializedPageWriter : public PageWriter {
  public:
   SerializedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink,
                        Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
-                       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+                       MemoryPool* pool = ::arrow::default_memory_pool())
       : sink_(sink),
         metadata_(metadata),
         pool_(pool),
@@ -268,7 +272,7 @@ class SerializedPageWriter : public PageWriter {
  private:
   std::shared_ptr<ArrowOutputStream> sink_;
   ColumnChunkMetaDataBuilder* metadata_;
-  ::arrow::MemoryPool* pool_;
+  MemoryPool* pool_;
   int64_t num_values_;
   int64_t dictionary_page_offset_;
   int64_t data_page_offset_;
@@ -286,7 +290,7 @@ class BufferedPageWriter : public PageWriter {
  public:
   BufferedPageWriter(const std::shared_ptr<ArrowOutputStream>& sink,
                      Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
-                     ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+                     MemoryPool* pool = ::arrow::default_memory_pool())
       : final_sink_(sink), metadata_(metadata) {
     in_memory_sink_ = CreateOutputStream(pool);
     pager_ = std::unique_ptr<SerializedPageWriter>(
@@ -334,8 +338,7 @@ class BufferedPageWriter : public PageWriter {
 
 std::unique_ptr<PageWriter> PageWriter::Open(
     const std::shared_ptr<ArrowOutputStream>& sink, Compression::type codec,
-    ColumnChunkMetaDataBuilder* metadata, ::arrow::MemoryPool* pool,
-    bool buffered_row_group) {
+    ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool, bool buffered_row_group) {
   if (buffered_row_group) {
     return std::unique_ptr<PageWriter>(
         new BufferedPageWriter(sink, codec, metadata, pool));
@@ -411,7 +414,9 @@ class ColumnWriterImpl {
   void AddDataPage();
 
   // Serializes Data Pages
-  void WriteDataPage(const CompressedDataPage& page);
+  void WriteDataPage(const CompressedDataPage& page) {
+    total_bytes_written_ += pager_->WriteDataPage(page);
+  }
 
   // Write multiple definition levels
   void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -445,7 +450,7 @@ class ColumnWriterImpl {
 
   LevelEncoder level_encoder_;
 
-  ::arrow::MemoryPool* allocator_;
+  MemoryPool* allocator_;
 
   // The total number of values stored in the data page. This is the maximum of
   // the number of encoded definition levels or encoded values. For
@@ -586,10 +591,6 @@ void ColumnWriterImpl::AddDataPage() {
   num_buffered_encoded_values_ = 0;
 }
 
-void ColumnWriterImpl::WriteDataPage(const CompressedDataPage& page) {
-  total_bytes_written_ += pager_->WriteDataPage(page);
-}
-
 int64_t ColumnWriterImpl::Close() {
   if (!closed_) {
     closed_ = true;
@@ -658,6 +659,10 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
                         const int16_t* rep_levels, const uint8_t* valid_bits,
                         int64_t valid_bits_offset, const T* values) override;
 
+  Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
+                    int64_t num_levels, const ::arrow::Array& array,
+                    ArrowWriteContext* context) override;
+
   int64_t EstimatedBufferedValueBytes() const override {
     return current_encoder_->EstimatedDataEncodedSize();
   }
@@ -666,7 +671,20 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
   std::shared_ptr<Buffer> GetValuesBuffer() override {
     return current_encoder_->FlushValues();
   }
-  void WriteDictionaryPage() override;
+
+  void WriteDictionaryPage() override {
+    // We have to dynamic cast here because of TypedEncoder<Type> as
+    // some compilers don't want to cast through virtual inheritance
+    auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
+    DCHECK(dict_encoder);
+    std::shared_ptr<ResizableBuffer> buffer =
+        AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
+    dict_encoder->WriteDict(buffer->mutable_data());
+
+    DictionaryPage page(buffer, dict_encoder->num_entries(),
+                        properties_->dictionary_page_encoding());
+    total_bytes_written_ += pager_->WriteDictionaryPage(page);
+  }
 
   // Checks if the Dictionary Page size limit is reached
   // If the limit is reached, the Dictionary and Data Pages are serialized
@@ -685,7 +703,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
     return result;
   }
 
-  void ResetPageStatistics() override;
+  void ResetPageStatistics() override {
+    if (chunk_statistics_ != nullptr) {
+      chunk_statistics_->Merge(*page_statistics_);
+      page_statistics_->Reset();
+    }
+  }
 
   Type::type type() const override { return descr_->physical_type(); }
 
@@ -700,6 +723,12 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
   const WriterProperties* properties() override { return properties_; }
 
  private:
+  using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
+  using TypedStats = TypedStatistics<DType>;
+  std::unique_ptr<Encoder> current_encoder_;
+  std::shared_ptr<TypedStats> page_statistics_;
+  std::shared_ptr<TypedStats> chunk_statistics_;
+
   inline int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
                                 const int16_t* rep_levels, const T* values);
 
@@ -710,16 +739,16 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter<
                                       int64_t* num_spaced_written);
 
   // Write values to a temporary buffer before they are encoded into pages
-  void WriteValues(int64_t num_values, const T* values);
-  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
-                         int64_t valid_bits_offset, const T* values);
-
-  using ValueEncoderType = typename EncodingTraits<DType>::Encoder;
-  std::unique_ptr<Encoder> current_encoder_;
+  void WriteValues(int64_t num_values, const T* values) {
+    dynamic_cast<ValueEncoderType*>(current_encoder_.get())
+        ->Put(values, static_cast<int>(num_values));
+  }
 
-  using TypedStats = TypedStatistics<DType>;
-  std::shared_ptr<TypedStats> page_statistics_;
-  std::shared_ptr<TypedStats> chunk_statistics_;
+  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+                         int64_t valid_bits_offset, const T* values) {
+    dynamic_cast<ValueEncoderType*>(current_encoder_.get())
+        ->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+  }
 };
 
 // Only one Dictionary Page is written.
@@ -741,29 +770,6 @@ void TypedColumnWriterImpl<DType>::CheckDictionarySizeLimit() {
   }
 }
 
-template <typename DType>
-void TypedColumnWriterImpl<DType>::WriteDictionaryPage() {
-  // We have to dynamic cast here because TypedEncoder<Type> as some compilers
-  // don't want to cast through virtual inheritance
-  auto dict_encoder = dynamic_cast<DictEncoder<DType>*>(current_encoder_.get());
-  DCHECK(dict_encoder);
-  std::shared_ptr<ResizableBuffer> buffer =
-      AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
-  dict_encoder->WriteDict(buffer->mutable_data());
-
-  DictionaryPage page(buffer, dict_encoder->num_entries(),
-                      properties_->dictionary_page_encoding());
-  total_bytes_written_ += pager_->WriteDictionaryPage(page);
-}
-
-template <typename DType>
-void TypedColumnWriterImpl<DType>::ResetPageStatistics() {
-  if (chunk_statistics_ != nullptr) {
-    chunk_statistics_->Merge(*page_statistics_);
-    page_statistics_->Reset();
-  }
-}
-
 // ----------------------------------------------------------------------
 // Instantiate templated classes
 
@@ -952,19 +958,534 @@ void TypedColumnWriterImpl<DType>::WriteBatchSpaced(
                        values + values_offset, &num_spaced_written);
 }
 
-template <typename DType>
-void TypedColumnWriterImpl<DType>::WriteValues(int64_t num_values, const T* values) {
-  dynamic_cast<ValueEncoderType*>(current_encoder_.get())
-      ->Put(values, static_cast<int>(num_values));
+// ----------------------------------------------------------------------
+// Direct Arrow write path
+
+template <typename ParquetType, typename ArrowType, typename Enable = void>
+struct SerializeFunctor {
+  using ArrowCType = typename ArrowType::c_type;
+  using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+  using ParquetCType = typename ParquetType::c_type;
+  Status Serialize(const ArrayType& array, ArrowWriteContext*, ParquetCType* out) {
+    const ArrowCType* input = array.raw_values();
+    if (array.null_count() > 0) {
+      for (int i = 0; i < array.length(); i++) {
+        out[i] = static_cast<ParquetCType>(input[i]);
+      }
+    } else {
+      std::copy(input, input + array.length(), out);
+    }
+    return Status::OK();
+  }
+};
+
+template <typename ParquetType, typename ArrowType>
+inline Status SerializeData(const ::arrow::Array& array, ArrowWriteContext* ctx,
+                            typename ParquetType::c_type* out) {
+  using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
+  SerializeFunctor<ParquetType, ArrowType> functor;
+  return functor.Serialize(checked_cast<const ArrayType&>(array), ctx, out);
 }
 
-template <typename DType>
-void TypedColumnWriterImpl<DType>::WriteValuesSpaced(int64_t num_values,
-                                                     const uint8_t* valid_bits,
-                                                     int64_t valid_bits_offset,
-                                                     const T* values) {
-  dynamic_cast<ValueEncoderType*>(current_encoder_.get())
-      ->PutSpaced(values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
+template <typename ParquetType, typename ArrowType>
+Status WriteArrowSerialize(const ::arrow::Array& array, int64_t num_levels,
+                           const int16_t* def_levels, const int16_t* rep_levels,
+                           ArrowWriteContext* ctx,
+                           TypedColumnWriter<ParquetType>* writer) {
+  using ParquetCType = typename ParquetType::c_type;
+
+  ParquetCType* buffer;
+  PARQUET_THROW_NOT_OK(ctx->GetScratchData<ParquetCType>(array.length(), &buffer));
+
+  bool no_nulls =
+      writer->descr()->schema_node()->is_required() || (array.null_count() == 0);
+
+  Status s = SerializeData<ParquetType, ArrowType>(array, ctx, buffer);
+  RETURN_NOT_OK(s);
+  if (no_nulls) {
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
+  } else {
+    PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
+                                                  array.null_bitmap_data(),
+                                                  array.offset(), buffer));
+  }
+  return Status::OK();
+}
+
+template <typename ParquetType>
+Status WriteArrowZeroCopy(const ::arrow::Array& array, int64_t num_levels,
+                          const int16_t* def_levels, const int16_t* rep_levels,
+                          ArrowWriteContext* ctx,
+                          TypedColumnWriter<ParquetType>* writer) {
+  using T = typename ParquetType::c_type;
+  const auto& data = static_cast<const ::arrow::PrimitiveArray&>(array);
+  const T* values = nullptr;
+  // The values buffer may be null if the array is empty (ARROW-2744)
+  if (data.values() != nullptr) {
+    values = reinterpret_cast<const T*>(data.values()->data()) + data.offset();
+  } else {
+    DCHECK_EQ(data.length(), 0);
+  }
+  if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, values));
+  } else {
+    PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(num_levels, def_levels, rep_levels,
+                                                  data.null_bitmap_data(), data.offset(),
+                                                  values));
+  }
+  return Status::OK();
+}
+
+#define WRITE_SERIALIZE_CASE(ArrowEnum, ArrowType, ParquetType)  \
+  case ::arrow::Type::ArrowEnum:                                 \
+    return WriteArrowSerialize<ParquetType, ::arrow::ArrowType>( \
+        array, num_levels, def_levels, rep_levels, ctx, this);
+
+#define WRITE_ZERO_COPY_CASE(ArrowEnum, ArrowType, ParquetType)                       \
+  case ::arrow::Type::ArrowEnum:                                                      \
+    return WriteArrowZeroCopy<ParquetType>(array, num_levels, def_levels, rep_levels, \
+                                           ctx, this);
+
+#define ARROW_UNSUPPORTED()                                          \
+  std::stringstream ss;                                              \
+  ss << "Arrow type " << array.type()->ToString()                    \
+     << " cannot be written to Parquet type " << descr_->ToString(); \
+  return Status::Invalid(ss.str());
+
+// ----------------------------------------------------------------------
+// Write Arrow to BooleanType
+
+template <>
+Status TypedColumnWriterImpl<BooleanType>::WriteArrow(const int16_t* def_levels,
+                                                      const int16_t* rep_levels,
+                                                      int64_t num_levels,
+                                                      const ::arrow::Array& array,
+                                                      ArrowWriteContext* ctx) {
+  if (array.type_id() != ::arrow::Type::BOOL) {
+    ARROW_UNSUPPORTED();
+  }
+  bool* buffer = nullptr;
+  RETURN_NOT_OK(ctx->GetScratchData<bool>(array.length(), &buffer));
+
+  const auto& data = static_cast<const ::arrow::BooleanArray&>(array);
+  const uint8_t* values = nullptr;
+  // The values buffer may be null if the array is empty (ARROW-2744)
+  if (data.values() != nullptr) {
+    values = reinterpret_cast<const uint8_t*>(data.values()->data());
+  } else {
+    DCHECK_EQ(data.length(), 0);
+  }
+
+  int buffer_idx = 0;
+  int64_t offset = array.offset();
+  for (int i = 0; i < data.length(); i++) {
+    if (data.IsValid(i)) {
+      buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
+    }
+  }
+  PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, buffer));
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow types to INT32
+
+template <>
+struct SerializeFunctor<Int32Type, ::arrow::Date64Type> {
+  Status Serialize(const ::arrow::Date64Array& array, ArrowWriteContext*, int32_t* out) {
+    const int64_t* input = array.raw_values();
+    for (int i = 0; i < array.length(); i++) {
+      *out++ = static_cast<int32_t>(*input++ / 86400000);
+    }
+    return Status::OK();
+  }
+};
+
+template <>
+struct SerializeFunctor<Int32Type, ::arrow::Time32Type> {
+  Status Serialize(const ::arrow::Time32Array& array, ArrowWriteContext*, int32_t* out) {
+    const int32_t* input = array.raw_values();
+    const auto& type = static_cast<const ::arrow::Time32Type&>(*array.type());
+    if (type.unit() == ::arrow::TimeUnit::SECOND) {
+      for (int i = 0; i < array.length(); i++) {
+        out[i] = input[i] * 1000;
+      }
+    } else {
+      std::copy(input, input + array.length(), out);
+    }
+    return Status::OK();
+  }
+};
+
+template <>
+Status TypedColumnWriterImpl<Int32Type>::WriteArrow(const int16_t* def_levels,
+                                                    const int16_t* rep_levels,
+                                                    int64_t num_levels,
+                                                    const ::arrow::Array& array,
+                                                    ArrowWriteContext* ctx) {
+  switch (array.type()->id()) {
+    case ::arrow::Type::NA: {
+      PARQUET_CATCH_NOT_OK(WriteBatch(num_levels, def_levels, rep_levels, nullptr));
+    } break;
+      WRITE_SERIALIZE_CASE(INT8, Int8Type, Int32Type)
+      WRITE_SERIALIZE_CASE(UINT8, UInt8Type, Int32Type)
+      WRITE_SERIALIZE_CASE(INT16, Int16Type, Int32Type)
+      WRITE_SERIALIZE_CASE(UINT16, UInt16Type, Int32Type)
+      WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int32Type)
+      WRITE_ZERO_COPY_CASE(INT32, Int32Type, Int32Type)
+      WRITE_ZERO_COPY_CASE(DATE32, Date32Type, Int32Type)
+      WRITE_SERIALIZE_CASE(DATE64, Date64Type, Int32Type)
+      WRITE_SERIALIZE_CASE(TIME32, Time32Type, Int32Type)
+    default:
+      ARROW_UNSUPPORTED()
+  }
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow to Int64 and Int96
+
+#define INT96_CONVERT_LOOP(ConversionFunction) \
+  for (int64_t i = 0; i < array.length(); i++) ConversionFunction(input[i], &out[i]);
+
+template <>
+struct SerializeFunctor<Int96Type, ::arrow::TimestampType> {
+  Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext*, Int96* out) {
+    const int64_t* input = array.raw_values();
+    const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
+    switch (type.unit()) {
+      case ::arrow::TimeUnit::NANO:
+        INT96_CONVERT_LOOP(internal::NanosecondsToImpalaTimestamp);
+        break;
+      case ::arrow::TimeUnit::MICRO:
+        INT96_CONVERT_LOOP(internal::MicrosecondsToImpalaTimestamp);
+        break;
+      case ::arrow::TimeUnit::MILLI:
+        INT96_CONVERT_LOOP(internal::MillisecondsToImpalaTimestamp);
+        break;
+      case ::arrow::TimeUnit::SECOND:
+        INT96_CONVERT_LOOP(internal::SecondsToImpalaTimestamp);
+        break;
+    }
+    return Status::OK();
+  }
+};
+
+#define COERCE_DIVIDE -1
+#define COERCE_INVALID 0
+#define COERCE_MULTIPLY +1
+
+static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
+    // from seconds ...
+    {{COERCE_INVALID, 0},                      // ... to seconds
+     {COERCE_MULTIPLY, 1000},                  // ... to millis
+     {COERCE_MULTIPLY, 1000000},               // ... to micros
+     {COERCE_MULTIPLY, INT64_C(1000000000)}},  // ... to nanos
+    // from millis ...
+    {{COERCE_INVALID, 0},
+     {COERCE_MULTIPLY, 1},
+     {COERCE_MULTIPLY, 1000},
+     {COERCE_MULTIPLY, 1000000}},
+    // from micros ...
+    {{COERCE_INVALID, 0},
+     {COERCE_DIVIDE, 1000},
+     {COERCE_MULTIPLY, 1},
+     {COERCE_MULTIPLY, 1000}},
+    // from nanos ...
+    {{COERCE_INVALID, 0},
+     {COERCE_DIVIDE, 1000000},
+     {COERCE_DIVIDE, 1000},
+     {COERCE_MULTIPLY, 1}}};
+
+template <>
+struct SerializeFunctor<Int64Type, ::arrow::TimestampType> {
+  Status Serialize(const ::arrow::TimestampArray& array, ArrowWriteContext* ctx,
+                   int64_t* out) {
+    const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
+    auto source_unit = source_type.unit();
+    const int64_t* values = array.raw_values();
+
+    ::arrow::TimeUnit::type target_unit = ctx->properties->coerce_timestamps_unit();
+    auto target_type = ::arrow::timestamp(target_unit);
+    bool truncation_allowed = ctx->properties->truncated_timestamps_allowed();
+
+    auto DivideBy = [&](const int64_t factor) {
+      for (int64_t i = 0; i < array.length(); i++) {
+        if (!truncation_allowed && array.IsValid(i) && (values[i] % factor != 0)) {
+          return Status::Invalid("Casting from ", source_type.ToString(), " to ",
+                                 target_type->ToString(),
+                                 " would lose data: ", values[i]);
+        }
+        out[i] = values[i] / factor;
+      }
+      return Status::OK();
+    };
+
+    auto MultiplyBy = [&](const int64_t factor) {
+      for (int64_t i = 0; i < array.length(); i++) {
+        out[i] = values[i] * factor;
+      }
+      return Status::OK();
+    };
+
+    const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
+                                                    [static_cast<int>(target_unit)];
+
+    // .first -> coercion operation; .second -> scale factor
+    DCHECK_NE(coercion.first, COERCE_INVALID);
+    return coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
+                                           : MultiplyBy(coercion.second);
+  }
+};
+
+#undef COERCE_DIVIDE
+#undef COERCE_INVALID
+#undef COERCE_MULTIPLY
+
+Status WriteTimestamps(const ::arrow::Array& values, int64_t num_levels,
+                       const int16_t* def_levels, const int16_t* rep_levels,
+                       ArrowWriteContext* ctx, TypedColumnWriter<Int64Type>* writer) {
+  const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
+
+  auto WriteCoerce = [&](const ArrowWriterProperties* properties) {
+    ArrowWriteContext temp_ctx = *ctx;
+    temp_ctx.properties = properties;
+    return WriteArrowSerialize<Int64Type, ::arrow::TimestampType>(
+        values, num_levels, def_levels, rep_levels, &temp_ctx, writer);
+  };
+
+  if (ctx->properties->coerce_timestamps_enabled()) {
+    // User explicitly requested coercion to specific unit
+    if (source_type.unit() == ctx->properties->coerce_timestamps_unit()) {
+      // No data conversion necessary
+      return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels,
+                                           ctx, writer);
+    } else {
+      return WriteCoerce(ctx->properties);
+    }
+  } else if (writer->properties()->version() == ParquetVersion::PARQUET_1_0 &&
+             source_type.unit() == ::arrow::TimeUnit::NANO) {
+    // Absent superseding user instructions, when writing Parquet version 1.0 files,
+    // timestamps in nanoseconds are coerced to microseconds
+    std::shared_ptr<ArrowWriterProperties> properties =
+        (ArrowWriterProperties::Builder())
+            .coerce_timestamps(::arrow::TimeUnit::MICRO)
+            ->disallow_truncated_timestamps()
+            ->build();
+    return WriteCoerce(properties.get());
+  } else if (source_type.unit() == ::arrow::TimeUnit::SECOND) {
+    // Absent superseding user instructions, timestamps in seconds are coerced to
+    // milliseconds
+    std::shared_ptr<ArrowWriterProperties> properties =
+        (ArrowWriterProperties::Builder())
+            .coerce_timestamps(::arrow::TimeUnit::MILLI)
+            ->build();
+    return WriteCoerce(properties.get());
+  } else {
+    // No data conversion necessary
+    return WriteArrowZeroCopy<Int64Type>(values, num_levels, def_levels, rep_levels, ctx,
+                                         writer);
+  }
+}
+
+template <>
+Status TypedColumnWriterImpl<Int64Type>::WriteArrow(const int16_t* def_levels,
+                                                    const int16_t* rep_levels,
+                                                    int64_t num_levels,
+                                                    const ::arrow::Array& array,
+                                                    ArrowWriteContext* ctx) {
+  switch (array.type()->id()) {
+    case ::arrow::Type::TIMESTAMP:
+      return WriteTimestamps(array, num_levels, def_levels, rep_levels, ctx, this);
+      WRITE_ZERO_COPY_CASE(INT64, Int64Type, Int64Type)
+      WRITE_SERIALIZE_CASE(UINT32, UInt32Type, Int64Type)
+      WRITE_SERIALIZE_CASE(UINT64, UInt64Type, Int64Type)
+      WRITE_ZERO_COPY_CASE(TIME64, Time64Type, Int64Type)
+    default:
+      ARROW_UNSUPPORTED();
+  }
+}
+
+template <>
+Status TypedColumnWriterImpl<Int96Type>::WriteArrow(const int16_t* def_levels,
+                                                    const int16_t* rep_levels,
+                                                    int64_t num_levels,
+                                                    const ::arrow::Array& array,
+                                                    ArrowWriteContext* ctx) {
+  if (array.type_id() != ::arrow::Type::TIMESTAMP) {
+    ARROW_UNSUPPORTED();
+  }
+  return WriteArrowSerialize<Int96Type, ::arrow::TimestampType>(
+      array, num_levels, def_levels, rep_levels, ctx, this);
+}
+
+// ----------------------------------------------------------------------
+// Floating point types
+
+template <>
+Status TypedColumnWriterImpl<FloatType>::WriteArrow(const int16_t* def_levels,
+                                                    const int16_t* rep_levels,
+                                                    int64_t num_levels,
+                                                    const ::arrow::Array& array,
+                                                    ArrowWriteContext* ctx) {
+  if (array.type_id() != ::arrow::Type::FLOAT) {
+    ARROW_UNSUPPORTED();
+  }
+  return WriteArrowZeroCopy<FloatType>(array, num_levels, def_levels, rep_levels, ctx,
+                                       this);
+}
+
+template <>
+Status TypedColumnWriterImpl<DoubleType>::WriteArrow(const int16_t* def_levels,
+                                                     const int16_t* rep_levels,
+                                                     int64_t num_levels,
+                                                     const ::arrow::Array& array,
+                                                     ArrowWriteContext* ctx) {
+  if (array.type_id() != ::arrow::Type::DOUBLE) {
+    ARROW_UNSUPPORTED();
+  }
+  return WriteArrowZeroCopy<DoubleType>(array, num_levels, def_levels, rep_levels, ctx,
+                                        this);
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow to BYTE_ARRAY
+
+template <typename ParquetType, typename ArrowType>
+struct SerializeFunctor<ParquetType, ArrowType, ::arrow::enable_if_binary<ArrowType>> {
+  Status Serialize(const ::arrow::BinaryArray& array, ArrowWriteContext*,
+                   ByteArray* out) {
+    // In the case of an array consisting of only empty strings or all null,
+    // array.data() points already to a nullptr, thus array.data()->data() will
+    // segfault.
+    const uint8_t* values = nullptr;
+    if (array.value_data()) {
+      values = reinterpret_cast<const uint8_t*>(array.value_data()->data());
+      DCHECK(values != nullptr);
+    }
+
+    // Slice offset is accounted for in raw_value_offsets
+    const int32_t* value_offset = array.raw_value_offsets();
+    if (array.null_count() == 0) {
+      // no nulls, just dump the data
+      for (int64_t i = 0; i < array.length(); i++) {
+        out[i] =
+            ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
+      }
+    } else {
+      for (int64_t i = 0; i < array.length(); i++) {
+        if (array.IsValid(i)) {
+          out[i] =
+              ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
+        }
+      }
+    }
+    return Status::OK();
+  }
+};
+
+template <>
+Status TypedColumnWriterImpl<ByteArrayType>::WriteArrow(const int16_t* def_levels,
+                                                        const int16_t* rep_levels,
+                                                        int64_t num_levels,
+                                                        const ::arrow::Array& array,
+                                                        ArrowWriteContext* ctx) {
+  switch (array.type()->id()) {
+    WRITE_SERIALIZE_CASE(BINARY, BinaryType, ByteArrayType)
+    WRITE_SERIALIZE_CASE(STRING, BinaryType, ByteArrayType)
+    default:
+      ARROW_UNSUPPORTED();
+  }
+}
+
+// ----------------------------------------------------------------------
+// Write Arrow to FIXED_LEN_BYTE_ARRAY
+
+template <typename ParquetType, typename ArrowType>
+struct SerializeFunctor<ParquetType, ArrowType,
+                        ::arrow::enable_if_fixed_size_binary<ArrowType>> {
+  Status Serialize(const ::arrow::FixedSizeBinaryArray& array, ArrowWriteContext*,
+                   FLBA* out) {
+    if (array.null_count() == 0) {
+      // no nulls, just dump the data
+      // todo(advancedxy): use a writeBatch to avoid this step
+      for (int64_t i = 0; i < array.length(); i++) {
+        out[i] = FixedLenByteArray(array.GetValue(i));
+      }
+    } else {
+      for (int64_t i = 0; i < array.length(); i++) {
+        if (array.IsValid(i)) {
+          out[i] = FixedLenByteArray(array.GetValue(i));
+        }
+      }
+    }
+    return Status::OK();
+  }
+};
+
+template <>
+Status WriteArrowSerialize<FLBAType, ::arrow::Decimal128Type>(
+    const ::arrow::Array& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, ArrowWriteContext* ctx,
+    TypedColumnWriter<FLBAType>* writer) {
+  const auto& data = static_cast<const ::arrow::Decimal128Array&>(array);
+  const int64_t length = data.length();
+
+  FLBA* buffer;
+  RETURN_NOT_OK(ctx->GetScratchData<FLBA>(num_levels, &buffer));
+
+  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
+  const int32_t offset =
+      decimal_type.byte_width() - internal::DecimalSize(decimal_type.precision());
+
+  const bool does_not_have_nulls =
+      writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+  const auto valid_value_count = static_cast<size_t>(length - data.null_count()) * 2;
+  std::vector<uint64_t> big_endian_values(valid_value_count);
+
+  // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
+  // don't have to keep writing two loops to handle the case where we know there are no
+  // nulls
+  if (does_not_have_nulls) {
+    // no nulls, just dump the data
+    // todo(advancedxy): use a writeBatch to avoid this step
+    for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
+      auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+      big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+      big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+      buffer[i] = FixedLenByteArray(
+          reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+    }
+  } else {
+    for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
+      if (data.IsValid(i)) {
+        auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
+        big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
+        big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
+        buffer[buffer_idx++] = FixedLenByteArray(
+            reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
+        j += 2;
+      }
+    }
+  }
+  PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, buffer));
+  return Status::OK();
+}
+
+template <>
+Status TypedColumnWriterImpl<FLBAType>::WriteArrow(const int16_t* def_levels,
+                                                   const int16_t* rep_levels,
+                                                   int64_t num_levels,
+                                                   const ::arrow::Array& array,
+                                                   ArrowWriteContext* ctx) {
+  switch (array.type()->id()) {
+    WRITE_SERIALIZE_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
+    WRITE_SERIALIZE_CASE(DECIMAL, Decimal128Type, FLBAType)
+    default:
+      break;
+  }
+  return Status::OK();
 }
 
 // ----------------------------------------------------------------------
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 0a67a86..27ca400 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <cstdint>
+#include <cstring>
 #include <memory>
 
 #include "parquet/exception.h"
@@ -26,6 +27,8 @@
 
 namespace arrow {
 
+class Array;
+
 namespace BitUtil {
 class BitWriter;
 }  // namespace BitUtil
@@ -38,6 +41,7 @@ class RleEncoder;
 
 namespace parquet {
 
+struct ArrowWriteContext;
 class ColumnDescriptor;
 class CompressedDataPage;
 class DictionaryPage;
@@ -130,6 +134,13 @@ class PARQUET_EXPORT ColumnWriter {
 
   /// \brief The file-level writer properties
   virtual const WriterProperties* properties() = 0;
+
+  /// \brief Write Apache Arrow columnar data directly to ColumnWriter. Returns
+  /// error status if the array data type is not compatible with the concrete
+  /// writer type
+  virtual ::arrow::Status WriteArrow(const int16_t* def_levels, const int16_t* rep_levels,
+                                     int64_t num_levels, const ::arrow::Array& array,
+                                     ArrowWriteContext* ctx) = 0;
 };
 
 // API to write values to a single column. This is the main client facing API.
@@ -186,4 +197,55 @@ using DoubleWriter = TypedColumnWriter<DoubleType>;
 using ByteArrayWriter = TypedColumnWriter<ByteArrayType>;
 using FixedLenByteArrayWriter = TypedColumnWriter<FLBAType>;
 
+namespace internal {
+
+/**
+ * Timestamp conversion constants
+ */
+constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
+
+template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
+inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
+  int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
+  (*impala_timestamp).value[2] = (uint32_t)julian_days;
+
+  int64_t last_day_units = time % UnitPerDay;
+  auto last_day_nanos = last_day_units * NanosecondsPerUnit;
+  // impala_timestamp will be unaligned every other entry so do memcpy instead
+  // of assign and reinterpret cast to avoid undefined behavior.
+  std::memcpy(impala_timestamp, &last_day_nanos, sizeof(int64_t));
+}
+
+constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
+
+inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
+                                                                   impala_timestamp);
+}
+
+constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
+
+inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
+                                          Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
+      milliseconds, impala_timestamp);
+}
+
+constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
+
+inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
+                                          Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
+      microseconds, impala_timestamp);
+}
+
+constexpr int64_t kNanosecondsInNanos = INT64_C(1);
+
+inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
+                                         Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
+      nanoseconds, impala_timestamp);
+}
+
+}  // namespace internal
 }  // namespace parquet
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index 4d3c5d6..e8ffad4 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -38,4 +38,15 @@ std::shared_ptr<ArrowInputStream> ReaderProperties::GetStream(
   }
 }
 
+ArrowReaderProperties default_arrow_reader_properties() {
+  static ArrowReaderProperties default_reader_props;
+  return default_reader_props;
+}
+
+std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
+  static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
+      ArrowWriterProperties::Builder().build();
+  return default_writer_properties;
+}
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index d08d7b0..40a358c 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -21,6 +21,9 @@
 #include <memory>
 #include <string>
 #include <unordered_map>
+#include <unordered_set>
+
+#include "arrow/type.h"
 
 #include "parquet/exception.h"
 #include "parquet/parquet_version.h"
@@ -39,13 +42,13 @@ static bool DEFAULT_USE_BUFFERED_STREAM = false;
 
 class PARQUET_EXPORT ReaderProperties {
  public:
-  explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+  explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool())
       : pool_(pool) {
     buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
     buffer_size_ = DEFAULT_BUFFER_SIZE;
   }
 
-  ::arrow::MemoryPool* memory_pool() const { return pool_; }
+  MemoryPool* memory_pool() const { return pool_; }
 
   std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source,
                                               int64_t start, int64_t num_bytes);
@@ -61,7 +64,7 @@ class PARQUET_EXPORT ReaderProperties {
   int64_t buffer_size() const { return buffer_size_; }
 
  private:
-  ::arrow::MemoryPool* pool_;
+  MemoryPool* pool_;
   int64_t buffer_size_;
   bool buffered_stream_enabled_;
 };
@@ -142,7 +145,7 @@ class PARQUET_EXPORT WriterProperties {
           created_by_(DEFAULT_CREATED_BY) {}
     virtual ~Builder() {}
 
-    Builder* memory_pool(::arrow::MemoryPool* pool) {
+    Builder* memory_pool(MemoryPool* pool) {
       pool_ = pool;
       return this;
     }
@@ -320,7 +323,7 @@ class PARQUET_EXPORT WriterProperties {
     }
 
    private:
-    ::arrow::MemoryPool* pool_;
+    MemoryPool* pool_;
     int64_t dictionary_pagesize_limit_;
     int64_t write_batch_size_;
     int64_t max_row_group_length_;
@@ -336,7 +339,7 @@ class PARQUET_EXPORT WriterProperties {
     std::unordered_map<std::string, bool> statistics_enabled_;
   };
 
-  inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
+  inline MemoryPool* memory_pool() const { return pool_; }
 
   inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
 
@@ -395,10 +398,9 @@ class PARQUET_EXPORT WriterProperties {
 
  private:
   explicit WriterProperties(
-      ::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
-      int64_t write_batch_size, int64_t max_row_group_length, int64_t pagesize,
-      ParquetVersion::type version, const std::string& created_by,
-      const ColumnProperties& default_column_properties,
+      MemoryPool* pool, int64_t dictionary_pagesize_limit, int64_t write_batch_size,
+      int64_t max_row_group_length, int64_t pagesize, ParquetVersion::type version,
+      const std::string& created_by, const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& column_properties)
       : pool_(pool),
         dictionary_pagesize_limit_(dictionary_pagesize_limit),
@@ -410,7 +412,7 @@ class PARQUET_EXPORT WriterProperties {
         default_column_properties_(default_column_properties),
         column_properties_(column_properties) {}
 
-  ::arrow::MemoryPool* pool_;
+  MemoryPool* pool_;
   int64_t dictionary_pagesize_limit_;
   int64_t write_batch_size_;
   int64_t max_row_group_length_;
@@ -423,6 +425,161 @@ class PARQUET_EXPORT WriterProperties {
 
 std::shared_ptr<WriterProperties> PARQUET_EXPORT default_writer_properties();
 
+// ----------------------------------------------------------------------
+// Properties specific to Apache Arrow columnar read and write
+
+static constexpr bool kArrowDefaultUseThreads = false;
+
+// Default number of rows to read when using ::arrow::RecordBatchReader
+static constexpr int64_t kArrowDefaultBatchSize = 64 * 1024;
+
+/// EXPERIMENTAL: Properties for configuring FileReader behavior.
+class PARQUET_EXPORT ArrowReaderProperties {
+ public:
+  explicit ArrowReaderProperties(bool use_threads = kArrowDefaultUseThreads)
+      : use_threads_(use_threads),
+        read_dict_indices_(),
+        batch_size_(kArrowDefaultBatchSize) {}
+
+  void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
+
+  bool use_threads() const { return use_threads_; }
+
+  void set_read_dictionary(int column_index, bool read_dict) {
+    if (read_dict) {
+      read_dict_indices_.insert(column_index);
+    } else {
+      read_dict_indices_.erase(column_index);
+    }
+  }
+  bool read_dictionary(int column_index) const {
+    if (read_dict_indices_.find(column_index) != read_dict_indices_.end()) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
+
+  int64_t batch_size() const { return batch_size_; }
+
+ private:
+  bool use_threads_;
+  std::unordered_set<int> read_dict_indices_;
+  int64_t batch_size_;
+};
+
+/// EXPERIMENTAL: Constructs the default ArrowReaderProperties
+PARQUET_EXPORT
+ArrowReaderProperties default_arrow_reader_properties();
+
+class PARQUET_EXPORT ArrowWriterProperties {
+ public:
+  class Builder {
+   public:
+    Builder()
+        : write_timestamps_as_int96_(false),
+          coerce_timestamps_enabled_(false),
+          coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
+          truncated_timestamps_allowed_(false) {}
+    virtual ~Builder() {}
+
+    Builder* disable_deprecated_int96_timestamps() {
+      write_timestamps_as_int96_ = false;
+      return this;
+    }
+
+    Builder* enable_deprecated_int96_timestamps() {
+      write_timestamps_as_int96_ = true;
+      return this;
+    }
+
+    Builder* coerce_timestamps(::arrow::TimeUnit::type unit) {
+      coerce_timestamps_enabled_ = true;
+      coerce_timestamps_unit_ = unit;
+      return this;
+    }
+
+    Builder* allow_truncated_timestamps() {
+      truncated_timestamps_allowed_ = true;
+      return this;
+    }
+
+    Builder* disallow_truncated_timestamps() {
+      truncated_timestamps_allowed_ = false;
+      return this;
+    }
+
+    std::shared_ptr<ArrowWriterProperties> build() {
+      return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
+          write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
+          truncated_timestamps_allowed_));
+    }
+
+   private:
+    bool write_timestamps_as_int96_;
+
+    bool coerce_timestamps_enabled_;
+    ::arrow::TimeUnit::type coerce_timestamps_unit_;
+    bool truncated_timestamps_allowed_;
+  };
+
+  bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
+
+  bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
+  ::arrow::TimeUnit::type coerce_timestamps_unit() const {
+    return coerce_timestamps_unit_;
+  }
+
+  bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
+
+ private:
+  explicit ArrowWriterProperties(bool write_nanos_as_int96,
+                                 bool coerce_timestamps_enabled,
+                                 ::arrow::TimeUnit::type coerce_timestamps_unit,
+                                 bool truncated_timestamps_allowed)
+      : write_timestamps_as_int96_(write_nanos_as_int96),
+        coerce_timestamps_enabled_(coerce_timestamps_enabled),
+        coerce_timestamps_unit_(coerce_timestamps_unit),
+        truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
+
+  const bool write_timestamps_as_int96_;
+  const bool coerce_timestamps_enabled_;
+  const ::arrow::TimeUnit::type coerce_timestamps_unit_;
+  const bool truncated_timestamps_allowed_;
+};
+
+/// \brief State object used for writing Arrow data directly to a Parquet
+/// column chunk. API possibly not stable
+struct ArrowWriteContext {
+  ArrowWriteContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
+      : memory_pool(memory_pool),
+        properties(properties),
+        data_buffer(AllocateBuffer(memory_pool)),
+        def_levels_buffer(AllocateBuffer(memory_pool)) {}
+
+  template <typename T>
+  ::arrow::Status GetScratchData(const int64_t num_values, T** out) {
+    ARROW_RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
+    *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
+    return ::arrow::Status::OK();
+  }
+
+  MemoryPool* memory_pool;
+  const ArrowWriterProperties* properties;
+
+  // Buffer used for storing the data of an array converted to the physical type
+  // as expected by parquet-cpp.
+  std::shared_ptr<ResizableBuffer> data_buffer;
+
+  // We use the shared ownership of this buffer
+  std::shared_ptr<ResizableBuffer> def_levels_buffer;
+};
+
+PARQUET_EXPORT
+std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties();
+
 }  // namespace parquet
 
 #endif  // PARQUET_COLUMN_PROPERTIES_H
diff --git a/cpp/src/parquet/schema.cc b/cpp/src/parquet/schema.cc
index e961127..1bada62 100644
--- a/cpp/src/parquet/schema.cc
+++ b/cpp/src/parquet/schema.cc
@@ -25,7 +25,6 @@
 #include <utility>
 
 #include "arrow/util/logging.h"
-#include "arrow/util/macros.h"
 
 #include "parquet/exception.h"
 #include "parquet/schema-internal.h"
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index 822afcd..f7e0cf3 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -1608,4 +1608,80 @@ class LogicalType::Impl::Unknown final : public LogicalType::Impl::SimpleCompati
 
 GENERATE_MAKE(Unknown)
 
+namespace internal {
+
+/// \brief Compute the number of bytes required to represent a decimal of a
+/// given precision. Taken from the Apache Impala codebase. The comments next
+/// to the return values are the maximum value that can be represented in 2's
+/// complement with the returned number of bytes.
+int32_t DecimalSize(int32_t precision) {
+  DCHECK_GE(precision, 1) << "decimal precision must be greater than or equal to 1, got "
+                          << precision;
+  DCHECK_LE(precision, 38) << "decimal precision must be less than or equal to 38, got "
+                           << precision;
+
+  switch (precision) {
+    case 1:
+    case 2:
+      return 1;  // 127
+    case 3:
+    case 4:
+      return 2;  // 32,767
+    case 5:
+    case 6:
+      return 3;  // 8,388,607
+    case 7:
+    case 8:
+    case 9:
+      return 4;  // 2,147,483,427
+    case 10:
+    case 11:
+      return 5;  // 549,755,813,887
+    case 12:
+    case 13:
+    case 14:
+      return 6;  // 140,737,488,355,327
+    case 15:
+    case 16:
+      return 7;  // 36,028,797,018,963,967
+    case 17:
+    case 18:
+      return 8;  // 9,223,372,036,854,775,807
+    case 19:
+    case 20:
+    case 21:
+      return 9;  // 2,361,183,241,434,822,606,847
+    case 22:
+    case 23:
+      return 10;  // 604,462,909,807,314,587,353,087
+    case 24:
+    case 25:
+    case 26:
+      return 11;  // 154,742,504,910,672,534,362,390,527
+    case 27:
+    case 28:
+      return 12;  // 39,614,081,257,132,168,796,771,975,167
+    case 29:
+    case 30:
+    case 31:
+      return 13;  // 10,141,204,801,825,835,211,973,625,643,007
+    case 32:
+    case 33:
+      return 14;  // 2,596,148,429,267,413,814,265,248,164,610,047
+    case 34:
+    case 35:
+      return 15;  // 664,613,997,892,457,936,451,903,530,140,172,287
+    case 36:
+    case 37:
+    case 38:
+      return 16;  // 170,141,183,460,469,231,731,687,303,715,884,105,727
+    default:
+      break;
+  }
+  DCHECK(false);
+  return -1;
+}
+
+}  // namespace internal
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index b317784..bc456ea 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -678,6 +678,12 @@ PARQUET_EXPORT SortOrder::type GetSortOrder(ConvertedType::type converted,
 PARQUET_EXPORT SortOrder::type GetSortOrder(
     const std::shared_ptr<const LogicalType>& logical_type, Type::type primitive);
 
+namespace internal {
+
+PARQUET_EXPORT
+int32_t DecimalSize(int32_t precision);
+
+}  // namespace internal
 }  // namespace parquet
 
 #endif  // PARQUET_TYPES_H
diff --git a/dev/lint/run_iwyu.sh b/dev/lint/run_iwyu.sh
index edae841..46517cf 100755
--- a/dev/lint/run_iwyu.sh
+++ b/dev/lint/run_iwyu.sh
@@ -22,6 +22,7 @@ mkdir -p /build/lint
 pushd /build/lint
 
 cmake -GNinja \
+      -DCMAKE_BUILD_TYPE=debug \
       -DARROW_FLIGHT=ON \
       -DARROW_GANDIVA=ON \
       -DARROW_PARQUET=ON \
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index 32ee1ad..82ca9fb 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -327,6 +327,15 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
 
     ReaderProperties default_reader_properties()
 
+    cdef cppclass ArrowReaderProperties:
+        ArrowReaderProperties()
+        void set_read_dictionary(int column_index, c_bool read_dict)
+        c_bool read_dictionary()
+        void set_batch_size()
+        int64_t batch_size()
+
+    ArrowReaderProperties default_arrow_reader_properties()
+
     cdef cppclass ParquetFileReader:
         shared_ptr[CFileMetaData] metadata()
 
@@ -348,17 +357,19 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
             Builder* write_batch_size(int64_t batch_size)
             shared_ptr[WriterProperties] build()
 
+    cdef cppclass ArrowWriterProperties:
+        cppclass Builder:
+            Builder()
+            Builder* disable_deprecated_int96_timestamps()
+            Builder* enable_deprecated_int96_timestamps()
+            Builder* coerce_timestamps(TimeUnit unit)
+            Builder* allow_truncated_timestamps()
+            Builder* disallow_truncated_timestamps()
+            shared_ptr[ArrowWriterProperties] build()
+        c_bool support_deprecated_int96_timestamps()
 
-cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
-    cdef cppclass ArrowReaderProperties:
-        ArrowReaderProperties()
-        void set_read_dictionary(int column_index, c_bool read_dict)
-        c_bool read_dictionary()
-        void set_batch_size()
-        int64_t batch_size()
-
-    ArrowReaderProperties default_arrow_reader_properties()
 
+cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
     cdef cppclass FileReader:
         FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
         CStatus ReadColumn(int i, shared_ptr[CChunkedArray]* out)
@@ -422,17 +433,6 @@ cdef extern from "parquet/arrow/writer.h" namespace "parquet::arrow" nogil:
 
         const shared_ptr[CFileMetaData] metadata() const
 
-    cdef cppclass ArrowWriterProperties:
-        cppclass Builder:
-            Builder()
-            Builder* disable_deprecated_int96_timestamps()
-            Builder* enable_deprecated_int96_timestamps()
-            Builder* coerce_timestamps(TimeUnit unit)
-            Builder* allow_truncated_timestamps()
-            Builder* disallow_truncated_timestamps()
-            shared_ptr[ArrowWriterProperties] build()
-        c_bool support_deprecated_int96_timestamps()
-
     CStatus WriteMetaDataFile(
         const CFileMetaData& file_metadata,
         const OutputStream* sink)
diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp
index 8cfaa77..5af51c5 100644
--- a/r/src/arrowExports.cpp
+++ b/r/src/arrowExports.cpp
@@ -2754,7 +2754,7 @@ RcppExport SEXP _arrow_ipc___ReadMessage(SEXP stream_sexp){
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-std::shared_ptr<parquet::arrow::ArrowReaderProperties> parquet___arrow___ArrowReaderProperties__Make(bool use_threads);
+std::shared_ptr<parquet::ArrowReaderProperties> parquet___arrow___ArrowReaderProperties__Make(bool use_threads);
 RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_threads_sexp){
 BEGIN_RCPP
 	Rcpp::traits::input_parameter<bool>::type use_threads(use_threads_sexp);
@@ -2769,10 +2769,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__Make(SEXP use_th
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, bool use_threads);
+void parquet___arrow___ArrowReaderProperties__set_use_threads(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, bool use_threads);
 RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_use_threads(SEXP properties_sexp, SEXP use_threads_sexp){
 BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
 	Rcpp::traits::input_parameter<bool>::type use_threads(use_threads_sexp);
 	parquet___arrow___ArrowReaderProperties__set_use_threads(properties, use_threads);
 	return R_NilValue;
@@ -2786,10 +2786,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_use_threads(
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-bool parquet___arrow___ArrowReaderProperties__get_use_threads(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, bool use_threads);
+bool parquet___arrow___ArrowReaderProperties__get_use_threads(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, bool use_threads);
 RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_use_threads(SEXP properties_sexp, SEXP use_threads_sexp){
 BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
 	Rcpp::traits::input_parameter<bool>::type use_threads(use_threads_sexp);
 	return Rcpp::wrap(parquet___arrow___ArrowReaderProperties__get_use_threads(properties, use_threads));
 END_RCPP
@@ -2802,10 +2802,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_use_threads(
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, int column_index);
+bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, int column_index);
 RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_read_dictionary(SEXP properties_sexp, SEXP column_index_sexp){
 BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
 	Rcpp::traits::input_parameter<int>::type column_index(column_index_sexp);
 	return Rcpp::wrap(parquet___arrow___ArrowReaderProperties__get_read_dictionary(properties, column_index));
 END_RCPP
@@ -2818,10 +2818,10 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__get_read_diction
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-void parquet___arrow___ArrowReaderProperties__set_read_dictionary(const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties, int column_index, bool read_dict);
+void parquet___arrow___ArrowReaderProperties__set_read_dictionary(const std::shared_ptr<parquet::ArrowReaderProperties>& properties, int column_index, bool read_dict);
 RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_read_dictionary(SEXP properties_sexp, SEXP column_index_sexp, SEXP read_dict_sexp){
 BEGIN_RCPP
-	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type properties(properties_sexp);
+	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type properties(properties_sexp);
 	Rcpp::traits::input_parameter<int>::type column_index(column_index_sexp);
 	Rcpp::traits::input_parameter<bool>::type read_dict(read_dict_sexp);
 	parquet___arrow___ArrowReaderProperties__set_read_dictionary(properties, column_index, read_dict);
@@ -2836,11 +2836,11 @@ RcppExport SEXP _arrow_parquet___arrow___ArrowReaderProperties__set_read_diction
 
 // parquet.cpp
 #if defined(ARROW_R_WITH_ARROW)
-std::unique_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFile(const std::shared_ptr<arrow::io::RandomAccessFile>& file, const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& props);
+std::unique_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFile(const std::shared_ptr<arrow::io::RandomAccessFile>& file, const std::shared_ptr<parquet::ArrowReaderProperties>& props);
 RcppExport SEXP _arrow_parquet___arrow___FileReader__OpenFile(SEXP file_sexp, SEXP props_sexp){
 BEGIN_RCPP
 	Rcpp::traits::input_parameter<const std::shared_ptr<arrow::io::RandomAccessFile>&>::type file(file_sexp);
-	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::arrow::ArrowReaderProperties>&>::type props(props_sexp);
+	Rcpp::traits::input_parameter<const std::shared_ptr<parquet::ArrowReaderProperties>&>::type props(props_sexp);
 	return Rcpp::wrap(parquet___arrow___FileReader__OpenFile(file, props));
 END_RCPP
 }
diff --git a/r/src/parquet.cpp b/r/src/parquet.cpp
index 5124e9e..692916b 100644
--- a/r/src/parquet.cpp
+++ b/r/src/parquet.cpp
@@ -24,35 +24,35 @@
 #include <parquet/exception.h>
 
 // [[arrow::export]]
-std::shared_ptr<parquet::arrow::ArrowReaderProperties>
+std::shared_ptr<parquet::ArrowReaderProperties>
 parquet___arrow___ArrowReaderProperties__Make(bool use_threads) {
-  return std::make_shared<parquet::arrow::ArrowReaderProperties>(use_threads);
+  return std::make_shared<parquet::ArrowReaderProperties>(use_threads);
 }
 
 // [[arrow::export]]
 void parquet___arrow___ArrowReaderProperties__set_use_threads(
-    const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+    const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
     bool use_threads) {
   properties->set_use_threads(use_threads);
 }
 
 // [[arrow::export]]
 bool parquet___arrow___ArrowReaderProperties__get_use_threads(
-    const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+    const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
     bool use_threads) {
   return properties->use_threads();
 }
 
 // [[arrow::export]]
 bool parquet___arrow___ArrowReaderProperties__get_read_dictionary(
-    const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+    const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
     int column_index) {
   return properties->read_dictionary(column_index);
 }
 
 // [[arrow::export]]
 void parquet___arrow___ArrowReaderProperties__set_read_dictionary(
-    const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& properties,
+    const std::shared_ptr<parquet::ArrowReaderProperties>& properties,
     int column_index, bool read_dict) {
   properties->set_read_dictionary(column_index, read_dict);
 }
@@ -60,10 +60,11 @@ void parquet___arrow___ArrowReaderProperties__set_read_dictionary(
 // [[arrow::export]]
 std::unique_ptr<parquet::arrow::FileReader> parquet___arrow___FileReader__OpenFile(
     const std::shared_ptr<arrow::io::RandomAccessFile>& file,
-    const std::shared_ptr<parquet::arrow::ArrowReaderProperties>& props) {
+    const std::shared_ptr<parquet::ArrowReaderProperties>& props) {
   std::unique_ptr<parquet::arrow::FileReader> reader;
-  PARQUET_THROW_NOT_OK(
-      parquet::arrow::OpenFile(file, arrow::default_memory_pool(), *props, &reader));
+  parquet::arrow::FileReaderBuilder builder;
+  PARQUET_THROW_NOT_OK(builder.Open(file));
+  PARQUET_THROW_NOT_OK(builder.properties(*props)->Build(&reader));
   return reader;
 }