You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/03/07 20:22:01 UTC

parquet-cpp git commit: PARQUET-890: Support I/O of DATE columns in parquet_arrow

Repository: parquet-cpp
Updated Branches:
  refs/heads/master fe8f98d0e -> c41a718da


PARQUET-890: Support I/O of DATE columns in parquet_arrow

Also fixes a bug on reading INT96 timestamps.

Author: Korn, Uwe <Uw...@blue-yonder.com>

Closes #266 from xhochy/PARQUET-890 and squashes the following commits:

8481c2c [Korn, Uwe] ninja lint
666d41b [Korn, Uwe] PARQUET-890: Support I/O of DATE columns in parquet_arrow


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c41a718d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c41a718d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c41a718d

Branch: refs/heads/master
Commit: c41a718dae9c60465ea0d8c99d6e3bdca11f802f
Parents: fe8f98d
Author: Korn, Uwe <Uw...@blue-yonder.com>
Authored: Tue Mar 7 15:21:51 2017 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Mar 7 15:21:51 2017 -0500

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc |  13 ++-
 src/parquet/arrow/arrow-schema-test.cc        |  11 ++-
 src/parquet/arrow/reader.cc                   |  49 +++++++++-
 src/parquet/arrow/schema.cc                   |  18 +++-
 src/parquet/arrow/test-util.h                 |  49 +++++++++-
 src/parquet/arrow/writer.cc                   | 106 ++++++++++++++-------
 6 files changed, 200 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 2dfdbd2..a0a39f1 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -149,6 +149,15 @@ struct test_traits<::arrow::TimestampType> {
 const int64_t test_traits<::arrow::TimestampType>::value(14695634030000);
 
 template <>
+struct test_traits<::arrow::DateType> {
+  static constexpr ParquetType::type parquet_enum = ParquetType::INT32;
+  static constexpr LogicalType::type logical_enum = LogicalType::DATE;
+  static int64_t const value;
+};
+
+const int64_t test_traits<::arrow::DateType>::value(14688000000000);
+
+template <>
 struct test_traits<::arrow::FloatType> {
   static constexpr ParquetType::type parquet_enum = ParquetType::FLOAT;
   static constexpr LogicalType::type logical_enum = LogicalType::NONE;
@@ -309,8 +318,8 @@ class TestParquetIO : public ::testing::Test {
 
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
-    ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
-    ::arrow::StringType, ::arrow::BinaryType>
+    ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::DateType, ::arrow::FloatType,
+    ::arrow::DoubleType, ::arrow::StringType, ::arrow::BinaryType>
     TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 43e57d8..8db792f 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -98,6 +98,10 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
       ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
   arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
 
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
+  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false));
+
   parquet_fields.push_back(
       PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
   arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
@@ -339,9 +343,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
 TEST_F(TestConvertParquetSchema, UnsupportedThings) {
   std::vector<NodePtr> unsupported_nodes;
 
-  unsupported_nodes.push_back(PrimitiveNode::Make(
-      "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
-
   for (const NodePtr& node : unsupported_nodes) {
     ASSERT_RAISES(NotImplemented, ConvertSchema({node}));
   }
@@ -394,6 +395,10 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
       PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64));
   arrow_fields.push_back(std::make_shared<Field>("int64", INT64, false));
 
+  parquet_fields.push_back(PrimitiveNode::Make(
+      "date", Repetition::REQUIRED, ParquetType::INT32, LogicalType::DATE));
+  arrow_fields.push_back(std::make_shared<Field>("date", ::arrow::date(), false));
+
   parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
       ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
   arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index df34d4c..73f6d87 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -361,7 +361,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
   PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
                            values_to_read, nullptr, nullptr, values, &values_read));
 
-  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
   for (int64_t i = 0; i < values_read; i++) {
     *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
   }
@@ -371,6 +371,24 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ
 }
 
 template <>
+Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::DateType, Int32Type>(
+    TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
+  auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
+  int64_t values_read;
+  PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
+                           values_to_read, nullptr, nullptr, values, &values_read));
+
+  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
+  for (int64_t i = 0; i < values_read; i++) {
+    *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
+  }
+  valid_bits_idx_ += values_read;
+
+  return Status::OK();
+}
+
+template <>
 Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
     TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
     int64_t* levels_read) {
@@ -464,6 +482,30 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
 }
 
 template <>
+Status ColumnReader::Impl::ReadNullableBatch<::arrow::DateType, Int32Type>(
+    TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
+    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
+  auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
+  int64_t null_count;
+  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels,
+      values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count));
+
+  auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+  INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
+  for (int64_t i = 0; i < *values_read; i++) {
+    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
+      data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
+    }
+    READ_NEXT_BITSET(valid_bits_ptr_);
+  }
+  null_count_ += null_count;
+  valid_bits_idx_ += *values_read;
+
+  return Status::OK();
+}
+
+template <>
 Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
     TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
     int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
@@ -843,6 +885,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
     TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
     TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
     TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
+    TYPED_BATCH_CASE(DATE, ::arrow::DateType, Int32Type)
     TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
     TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
     TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
@@ -865,7 +908,9 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
       break;
     }
     default:
-      return Status::NotImplemented(field_->type->ToString());
+      std::stringstream ss;
+      ss << "No support for reading columns of type " << field_->type->ToString();
+      return Status::NotImplemented(ss.str());
   }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 65e3381..0c336d9 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -77,7 +77,10 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
       *out = MakeDecimalType(node);
       break;
     default:
-      return Status::NotImplemented("unhandled type");
+      std::stringstream ss;
+      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+         << " for fixed-length binary array";
+      return Status::NotImplemented(ss.str());
       break;
   }
 
@@ -104,11 +107,17 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
     case LogicalType::UINT_32:
       *out = ::arrow::uint32();
       break;
+    case LogicalType::DATE:
+      *out = ::arrow::date();
+      break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
       break;
     default:
-      return Status::NotImplemented("Unhandled logical type for int32");
+      std::stringstream ss;
+      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+         << " for INT32";
+      return Status::NotImplemented(ss.str());
       break;
   }
   return Status::OK();
@@ -129,7 +138,10 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
       *out = TIMESTAMP_MS;
       break;
     default:
-      return Status::NotImplemented("Unhandled logical type for int64");
+      std::stringstream ss;
+      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+         << " for INT64";
+      return Status::NotImplemented(ss.str());
       break;
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index bfc9ce1..07f1f28 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -34,6 +34,9 @@ template <typename ArrowType>
 using is_arrow_int = std::is_integral<typename ArrowType::c_type>;
 
 template <typename ArrowType>
+using is_arrow_date = std::is_same<ArrowType, ::arrow::DateType>;
+
+template <typename ArrowType>
 using is_arrow_string = std::is_same<ArrowType, ::arrow::StringType>;
 
 template <typename ArrowType>
@@ -53,10 +56,27 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
 }
 
 template <class ArrowType>
-typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NonNullArray(
+typename std::enable_if<
+    is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
+NonNullArray(size_t size, std::shared_ptr<Array>* out) {
+  std::vector<typename ArrowType::c_type> values;
+  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+
+  // Passing data type so this will work with TimestampType too
+  ::arrow::NumericBuilder<ArrowType> builder(
+      ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size());
+  return builder.Finish(out);
+}
+
+template <class ArrowType>
+typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NonNullArray(
     size_t size, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
   ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  for (size_t i = 0; i < size; i++) {
+    values[i] *= 86400000;
+  }
 
   // Passing data type so this will work with TimestampType too
   ::arrow::NumericBuilder<ArrowType> builder(
@@ -107,13 +127,38 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
 
 // This helper function only supports (size/2) nulls.
 template <typename ArrowType>
-typename std::enable_if<is_arrow_int<ArrowType>::value, Status>::type NullableArray(
+typename std::enable_if<
+    is_arrow_int<ArrowType>::value && !is_arrow_date<ArrowType>::value, Status>::type
+NullableArray(size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
+  std::vector<typename ArrowType::c_type> values;
+
+  // Seed is random in Arrow right now
+  (void)seed;
+  ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  // Passing data type so this will work with TimestampType too
+  ::arrow::NumericBuilder<ArrowType> builder(
+      ::arrow::default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return builder.Finish(out);
+}
+
+template <typename ArrowType>
+typename std::enable_if<is_arrow_date<ArrowType>::value, Status>::type NullableArray(
     size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
   std::vector<typename ArrowType::c_type> values;
 
   // Seed is random in Arrow right now
   (void)seed;
   ::arrow::test::randint<typename ArrowType::c_type>(size, 0, 64, &values);
+  for (size_t i = 0; i < size; i++) {
+    values[i] *= 86400000;
+  }
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c41a718d/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 90e037f..6d2f9c0 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -257,39 +257,16 @@ class FileWriter::Impl {
       int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
+  Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
+      int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
+      const typename ArrowType::c_type* data_ptr);
+
+  template <typename ParquetType, typename ArrowType>
   Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
       int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
       const uint8_t* valid_bits, int64_t valid_bits_offset,
       const typename ArrowType::c_type* data_ptr);
 
-  // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
-  // buffer
-  template <typename InType, typename OutType>
-  struct can_copy_ptr {
-    static constexpr bool value =
-        std::is_same<InType, OutType>::value ||
-        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
-            (sizeof(InType) == sizeof(OutType)));
-  };
-
-  template <typename InType, typename OutType,
-      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* = nullptr>
-  Status ConvertPhysicalType(const InType* in_ptr, int64_t, const OutType** out_ptr) {
-    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
-    return Status::OK();
-  }
-
-  template <typename InType, typename OutType,
-      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type* = nullptr>
-  Status ConvertPhysicalType(
-      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
-    RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(OutType)));
-    OutType* mutable_out_ptr = reinterpret_cast<OutType*>(data_buffer_.mutable_data());
-    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
-    *out_ptr = mutable_out_ptr;
-    return Status::OK();
-  }
-
   Status WriteColumnChunk(const Array& data);
   Status Close();
 
@@ -323,7 +300,6 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
     const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
     const int16_t* rep_levels) {
   using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
 
   auto data = static_cast<const PrimitiveArray*>(array.get());
   auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data());
@@ -331,11 +307,8 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
 
   if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
     // no nulls, just dump the data
-    const ParquetCType* data_writer_ptr = nullptr;
-    RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
-        data_ptr + data->offset(), array->length(), &data_writer_ptr)));
-    PARQUET_CATCH_NOT_OK(
-        writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr));
+    RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(writer, array->length(),
+        num_levels, def_levels, rep_levels, data_ptr + data->offset())));
   } else {
     const uint8_t* valid_bits = data->null_bitmap_data();
     RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(),
@@ -347,6 +320,49 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
 }
 
 template <typename ParquetType, typename ArrowType>
+Status FileWriter::Impl::WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const typename ArrowType::c_type* data_ptr) {
+  using ParquetCType = typename ParquetType::c_type;
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
+  auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
+  std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  return Status::OK();
+}
+
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::DateType>(
+    TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+  for (int i = 0; i < num_values; i++) {
+    buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+  }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  return Status::OK();
+}
+
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                     \
+  template <>                                                                          \
+  Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>(              \
+      TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
+      const int16_t* def_levels, const int16_t* rep_levels, const CType* data_ptr) {   \
+    PARQUET_CATCH_NOT_OK(                                                              \
+        writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr));             \
+    return Status::OK();                                                               \
+  }
+
+NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
+NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
+
+template <typename ParquetType, typename ArrowType>
 Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
     int64_t num_values, int64_t num_levels, const int16_t* def_levels,
     const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
@@ -368,6 +384,27 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writ
   return Status::OK();
 }
 
+template <>
+Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::DateType>(
+    TypedColumnWriter<Int32Type>* writer, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const int64_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
+  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+  INIT_BITSET(valid_bits, valid_bits_offset);
+  for (int i = 0; i < num_values; i++) {
+    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+      // Convert from milliseconds into days since the epoch
+      buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+    }
+    READ_NEXT_BITSET(valid_bits);
+  }
+  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+  return Status::OK();
+}
+
 #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
   template <>                                                                          \
   Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(                 \
@@ -519,6 +556,7 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
       WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
       WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
       WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
+      WRITE_BATCH_CASE(DATE, DateType, Int32Type)
       WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
       WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
       WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)