You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/15 22:00:34 UTC

[arrow] branch master updated: ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all time…

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ec154d2  ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all time…
ec154d2 is described below

commit ec154d232ed5585721e0ef12d61c1a6e2c06fdae
Author: François Saint-Jacques <fs...@gmail.com>
AuthorDate: Sat Dec 15 16:00:27 2018 -0600

    ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all time…
    
    …stamps fields.
    
    This changes the behavior of `use_deprecated_int96_timestamps` to support
    all timestamp fields irregardless of the time unit. It would previously
    only apply this conversion to fields with Nanosecond resolution.
    
    People will only use this option when they use a system that only
    supports INT96 timestamps, systems that also support INT64 timestamps in
    other resolutions would not need the option.
    
    A notable API change is that this option now take precedence over the
    coerce_timestamps option.
    
    Author: François Saint-Jacques <fs...@gmail.com>
    
    Closes #3173 from fsaintjacques/ARROW-2026-parquet-int96-conversion and squashes the following commits:
    
    2897a7278 <François Saint-Jacques> ARROW-2026:  Enforce use_deprecated_int96_timestamps to all timestamps fields.
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 185 +++++++++++-----------
 cpp/src/parquet/arrow/reader.cc                   |  16 +-
 cpp/src/parquet/arrow/schema.cc                   |  73 ++++++---
 cpp/src/parquet/arrow/writer.cc                   |  74 +++++----
 cpp/src/parquet/arrow/writer.h                    |  62 ++++++--
 cpp/src/parquet/types.h                           |  21 +++
 python/pyarrow/parquet.py                         |   4 +-
 python/pyarrow/tests/test_parquet.py              |   5 +-
 8 files changed, 256 insertions(+), 184 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 07124eb..4e62a22 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1193,65 +1193,116 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros =
   auto f0 = field("f0", ::arrow::date32());
   auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
   auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
-  std::shared_ptr<::arrow::Field> f3;
-  if (nanos_as_micros) {
-    f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
-  } else {
-    f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
-  }
+  auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO;
+  auto f3 = field("f3", ::arrow::timestamp(f3_unit));
   auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
   auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
+
   std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));
 
   std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
                                      1489272000, 1489272000, 1489273000};
-  std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
-                                     1489272000000, 1489272000000, 1489273000000};
+  std::vector<int64_t> t64_ns_values = {1489269000000, 1489270000000, 1489271000000,
+                                        1489272000000, 1489272000000, 1489273000000};
   std::vector<int64_t> t64_us_values = {1489269000, 1489270000, 1489271000,
                                         1489272000, 1489272000, 1489273000};
+  std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
+                                        1489272, 1489272, 1489273};
 
   std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
   ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
-  ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
-  ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
-  if (nanos_as_micros) {
-    ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_us_values,
-                                                     &a3);
-  } else {
-    ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_values,
-                                                     &a3);
-  }
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
+                                                   &a1);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
+                                                   &a2);
+  auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3);
   ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
-  ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5);
+  ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);
 
   std::vector<std::shared_ptr<::arrow::Column>> columns = {
       std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
       std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
       std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};
+
   *out = Table::Make(schema, columns);
 }
 
 TEST(TestArrowReadWrite, DateTimeTypes) {
-  std::shared_ptr<Table> table;
+  std::shared_ptr<Table> table, result;
   MakeDateTimeTypesTable(&table);
 
-  // Use deprecated INT96 type
-  std::shared_ptr<Table> result;
-  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      table, false /* use_threads */, table->num_rows(), {}, &result,
-      ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
-
-  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
-
   // Cast nanaoseconds to microseconds and use INT64 physical type
   ASSERT_NO_FATAL_FAILURE(
       DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
-  std::shared_ptr<Table> expected;
   MakeDateTimeTypesTable(&table, true);
 
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
 }
 
+TEST(TestArrowReadWrite, UseDeprecatedInt96) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+  using ::arrow::schema;
+
+  std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+  auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+  auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+  std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
+  std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
+                                    1489272001, 1489272000, 1489273000};
+  std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000,
+                                    1489272000001, 1489272000000, 1489273000000};
+  std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL,
+                                    1489271000000000LL, 1489272000000001LL,
+                                    1489272000000000LL, 1489273000000000LL};
+
+  std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
+
+  // Each input is typed with a unique TimeUnit
+  auto input_schema = schema(
+      {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)});
+  auto input = Table::Make(
+      input_schema,
+      {std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms),
+       std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)});
+
+  // When reading parquet files, all int96 schema fields are converted to
+  // timestamp nanoseconds
+  auto ex_schema = schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
+                           field("f_ns", t_ns)});
+  auto ex_result = Table::Make(
+      ex_schema,
+      {std::make_shared<Column>("f_s", a_ns), std::make_shared<Column>("f_ms", a_ns),
+       std::make_shared<Column>("f_us", a_ns), std::make_shared<Column>("f_ns", a_ns)});
+
+  std::shared_ptr<Table> result;
+  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
+      input, false /* use_threads */, input->num_rows(), {}, &result,
+      ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
+
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
+
+  // Ensure enable_deprecated_int96_timestamps as precedence over
+  // coerce_timestamps.
+  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */,
+                                            input->num_rows(), {}, &result,
+                                            ArrowWriterProperties::Builder()
+                                                .enable_deprecated_int96_timestamps()
+                                                ->coerce_timestamps(TimeUnit::MILLI)
+                                                ->build()));
+
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
+}
+
 TEST(TestArrowReadWrite, CoerceTimestamps) {
   using ::arrow::ArrayFromVector;
   using ::arrow::field;
@@ -1297,6 +1348,12 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
       {std::make_shared<Column>("f_s", a_ms), std::make_shared<Column>("f_ms", a_ms),
        std::make_shared<Column>("f_us", a_ms), std::make_shared<Column>("f_ns", a_ms)});
 
+  std::shared_ptr<Table> milli_result;
+  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
+      input, false /* use_threads */, input->num_rows(), {}, &milli_result,
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));
+
   // Result when coercing to microseconds
   auto s3 = std::shared_ptr<::arrow::Schema>(
       new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
@@ -1306,13 +1363,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
       {std::make_shared<Column>("f_s", a_us), std::make_shared<Column>("f_ms", a_us),
        std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_us)});
 
-  std::shared_ptr<Table> milli_result;
-  ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
-      input, false /* use_threads */, input->num_rows(), {}, &milli_result,
-      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
-
-  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));
-
   std::shared_ptr<Table> micro_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
       input, false /* use_threads */, input->num_rows(), {}, &micro_result,
@@ -1457,65 +1507,6 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result));
 }
 
-// Regression for ARROW-2802
-TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
-  using ::arrow::Column;
-  using ::arrow::default_memory_pool;
-  using ::arrow::Field;
-  using ::arrow::Schema;
-  using ::arrow::Table;
-  using ::arrow::TimestampBuilder;
-  using ::arrow::TimestampType;
-  using ::arrow::TimeUnit;
-
-  auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);
-
-  TimestampBuilder builder(timestamp_type, default_memory_pool());
-  for (std::int64_t ii = 0; ii < 10; ++ii) {
-    ASSERT_OK(builder.Append(1000000000L * ii));
-  }
-  std::shared_ptr<Array> values;
-  ASSERT_OK(builder.Finish(&values));
-
-  std::vector<std::shared_ptr<Field>> fields;
-  auto field = std::make_shared<Field>("nanos", timestamp_type);
-  fields.emplace_back(field);
-
-  auto schema = std::make_shared<Schema>(fields);
-
-  std::vector<std::shared_ptr<Column>> columns;
-  auto column = std::make_shared<Column>("nanos", values);
-  columns.emplace_back(column);
-
-  auto table = Table::Make(schema, columns);
-
-  auto arrow_writer_properties = ArrowWriterProperties::Builder()
-                                     .coerce_timestamps(TimeUnit::MICRO)
-                                     ->enable_deprecated_int96_timestamps()
-                                     ->build();
-
-  std::shared_ptr<Table> result;
-  DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result,
-                    arrow_writer_properties);
-
-  ASSERT_EQ(table->num_columns(), result->num_columns());
-  ASSERT_EQ(table->num_rows(), result->num_rows());
-
-  auto actual_column = result->column(0);
-  auto data = actual_column->data();
-  auto expected_values =
-      static_cast<::arrow::NumericArray<TimestampType>*>(values.get())->raw_values();
-  for (int ii = 0; ii < data->num_chunks(); ++ii) {
-    auto chunk =
-        static_cast<::arrow::NumericArray<TimestampType>*>(data->chunk(ii).get());
-    auto values = chunk->raw_values();
-    for (int64_t jj = 0; jj < chunk->length(); ++jj, ++expected_values) {
-      // Check that the nanos have been converted to micros
-      ASSERT_EQ(*expected_values / 1000, values[jj]);
-    }
-  }
-}
-
 void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
                      std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Column> column;
@@ -2289,11 +2280,13 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
 INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
                         ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
 
-TEST(TestImpalaConversion, NanosecondToImpala) {
+TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) {
   // June 20, 2017 16:32:56 and 123456789 nanoseconds
   int64_t nanoseconds = INT64_C(1497976376123456789);
-  Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
+
   Int96 calculated;
+
+  Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
   internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated);
   ASSERT_EQ(expected, calculated);
 }
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 2a7730d..7830b6a 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -77,18 +77,6 @@ namespace arrow {
 
 using ::arrow::BitUtil::BytesForBits;
 
-constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
-constexpr int64_t kMillisecondsInADay = 86400000LL;
-constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
-
-static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
-  int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
-  int64_t nanoseconds = 0;
-
-  memcpy(&nanoseconds, &impala_timestamp.value, sizeof(int64_t));
-  return days_since_epoch * kNanosecondsInADay + nanoseconds;
-}
-
 template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
@@ -1045,7 +1033,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {
 
     auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
     for (int64_t i = 0; i < length; i++) {
-      *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
+      *data_ptr++ = Int96GetNanoSeconds(values[i]);
     }
 
     if (reader->nullable_values()) {
@@ -1072,7 +1060,7 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> {
     auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
 
     for (int64_t i = 0; i < length; i++) {
-      *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
+      *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
     }
 
     if (reader->nullable_values()) {
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index d0014a6..af9fbc9 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -423,45 +423,66 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
   return Status::OK();
 }
 
+static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) {
+  switch (time_unit) {
+    case ::arrow::TimeUnit::MILLI:
+      return LogicalType::TIMESTAMP_MILLIS;
+    case ::arrow::TimeUnit::MICRO:
+      return LogicalType::TIMESTAMP_MICROS;
+    case ::arrow::TimeUnit::SECOND:
+    case ::arrow::TimeUnit::NANO:
+      // No equivalent parquet logical type.
+      break;
+  }
+
+  return LogicalType::NONE;
+}
+
 static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
                                    const ArrowWriterProperties& properties,
                                    ParquetType::type* physical_type,
                                    LogicalType::type* logical_type) {
-  auto unit = type.unit();
-  *physical_type = ParquetType::INT64;
+  const bool coerce = properties.coerce_timestamps_enabled();
+  const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit();
 
-  if (properties.coerce_timestamps_enabled()) {
-    auto coerce_unit = properties.coerce_timestamps_unit();
-    if (coerce_unit == ::arrow::TimeUnit::MILLI) {
-      *logical_type = LogicalType::TIMESTAMP_MILLIS;
-    } else if (coerce_unit == ::arrow::TimeUnit::MICRO) {
-      *logical_type = LogicalType::TIMESTAMP_MICROS;
-    } else {
-      return Status::NotImplemented(
-          "Can only coerce Arrow timestamps to milliseconds"
-          " or microseconds");
+  // The user is explicitly asking for Impala int96 encoding, there is no
+  // logical type.
+  if (properties.support_deprecated_int96_timestamps()) {
+    *physical_type = ParquetType::INT96;
+    return Status::OK();
+  }
+
+  *physical_type = ParquetType::INT64;
+  *logical_type = LogicalTypeFromArrowTimeUnit(unit);
+
+  // The user is requesting that all timestamp columns are casted to a specific
+  // type. Only 2 TimeUnit are supported by arrow-parquet.
+  if (coerce) {
+    switch (unit) {
+      case ::arrow::TimeUnit::MILLI:
+      case ::arrow::TimeUnit::MICRO:
+        break;
+      case ::arrow::TimeUnit::NANO:
+      case ::arrow::TimeUnit::SECOND:
+        return Status::NotImplemented(
+            "Can only coerce Arrow timestamps to milliseconds"
+            " or microseconds");
     }
+
     return Status::OK();
   }
 
-  if (unit == ::arrow::TimeUnit::MILLI) {
-    *logical_type = LogicalType::TIMESTAMP_MILLIS;
-  } else if (unit == ::arrow::TimeUnit::MICRO) {
+  // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to
+  // int64 microseconds when deprecated int96 is not requested.
+  if (type.unit() == ::arrow::TimeUnit::NANO)
     *logical_type = LogicalType::TIMESTAMP_MICROS;
-  } else if (unit == ::arrow::TimeUnit::NANO) {
-    if (properties.support_deprecated_int96_timestamps()) {
-      *physical_type = ParquetType::INT96;
-      // No corresponding logical type
-    } else {
-      *logical_type = LogicalType::TIMESTAMP_MICROS;
-    }
-  } else {
+  else if (type.unit() == ::arrow::TimeUnit::SECOND)
     return Status::NotImplemented(
         "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
         "Parquet.");
-  }
+
   return Status::OK();
-}
+}  // namespace arrow
 
 Status FieldToNode(const std::shared_ptr<Field>& field,
                    const WriterProperties& properties,
@@ -698,7 +719,7 @@ int32_t DecimalSize(int32_t precision) {
   }
   DCHECK(false);
   return -1;
-}
+}  // namespace arrow
 
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 402cbf0..bce9f37 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -386,7 +386,11 @@ class ArrowColumnWriter {
   Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
                     const int16_t* rep_levels,
                     const typename ParquetType::c_type* values) {
-    auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+    auto typed_writer =
+        ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
+    // WriteBatch was called with type mismatching the writer_'s type. This
+    // could be a schema conversion problem.
+    DCHECK(typed_writer);
     PARQUET_CATCH_NOT_OK(
         typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
     return Status::OK();
@@ -397,7 +401,11 @@ class ArrowColumnWriter {
                           const int16_t* rep_levels, const uint8_t* valid_bits,
                           int64_t valid_bits_offset,
                           const typename ParquetType::c_type* values) {
-    auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+    auto typed_writer =
+        ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
+    // WriteBatchSpaced was called with type mismatching the writer_'s type. This
+    // could be a schema conversion problem.
+    DCHECK(typed_writer);
     PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
         num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
     return Status::OK();
@@ -570,20 +578,42 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
 
+#define CONV_CASE_LOOP(ConversionFunction) \
+  for (int64_t i = 0; i < num_values; i++) \
+    ConversionFunction(arrow_values[i], &output[i]);
+
+static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values,
+                                                int64_t num_values,
+                                                ::arrow::TimeUnit ::type unit_type,
+                                                Int96* output) {
+  switch (unit_type) {
+    case TimeUnit::NANO:
+      CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp);
+      break;
+    case TimeUnit::MICRO:
+      CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp);
+      break;
+    case TimeUnit::MILLI:
+      CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp);
+      break;
+    case TimeUnit::SECOND:
+      CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp);
+      break;
+  }
+}
+
+#undef CONV_CASE_LOOP
+
 template <>
 Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
     const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
     const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
     int64_t valid_bits_offset, const int64_t* values) {
-  Int96* buffer;
+  Int96* buffer = nullptr;
   RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
-  if (type.unit() == TimeUnit::NANO) {
-    for (int i = 0; i < num_values; i++) {
-      internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]);
-    }
-  } else {
-    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
-  }
+
+  ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
+
   return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
                                      valid_bits_offset, buffer);
 }
@@ -592,15 +622,11 @@ template <>
 Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
     const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
     const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
-  Int96* buffer;
+  Int96* buffer = nullptr;
   RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
-  if (type.unit() == TimeUnit::NANO) {
-    for (int i = 0; i < num_values; i++) {
-      internal::NanosecondsToImpalaTimestamp(values[i], buffer + i);
-    }
-  } else {
-    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
-  }
+
+  ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
+
   return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
 }
 
@@ -611,21 +637,15 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level
 
   const bool is_nanosecond = type.unit() == TimeUnit::NANO;
 
-  // In the case where support_deprecated_int96_timestamps was specified
-  // and coerce_timestamps_enabled was specified, a nanosecond column
-  // will have a physical type of int64. In that case, we fall through
-  // to the else if below.
-  //
-  // See https://issues.apache.org/jira/browse/ARROW-2082
-  if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps() &&
-      !ctx_->properties->coerce_timestamps_enabled()) {
+  if (ctx_->properties->support_deprecated_int96_timestamps()) {
+    // The user explicitly required to use Int96 storage.
     return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
                                                               def_levels, rep_levels);
   } else if (is_nanosecond ||
              (ctx_->properties->coerce_timestamps_enabled() &&
               (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
     // Casting is required. This covers several cases
-    // * Nanoseconds -> cast to microseconds
+    // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved)
     // * coerce_timestamps_enabled_, cast all timestamps to requested unit
     return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
                                  num_levels, def_levels, rep_levels);
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 2538c02..50cb4cf 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -45,19 +45,19 @@ class PARQUET_EXPORT ArrowWriterProperties {
   class Builder {
    public:
     Builder()
-        : write_nanos_as_int96_(false),
+        : write_timestamps_as_int96_(false),
           coerce_timestamps_enabled_(false),
           coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
           truncated_timestamps_allowed_(false) {}
     virtual ~Builder() {}
 
     Builder* disable_deprecated_int96_timestamps() {
-      write_nanos_as_int96_ = false;
+      write_timestamps_as_int96_ = false;
       return this;
     }
 
     Builder* enable_deprecated_int96_timestamps() {
-      write_nanos_as_int96_ = true;
+      write_timestamps_as_int96_ = true;
       return this;
     }
 
@@ -79,19 +79,19 @@ class PARQUET_EXPORT ArrowWriterProperties {
 
     std::shared_ptr<ArrowWriterProperties> build() {
       return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
-          write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
+          write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
           truncated_timestamps_allowed_));
     }
 
    private:
-    bool write_nanos_as_int96_;
+    bool write_timestamps_as_int96_;
 
     bool coerce_timestamps_enabled_;
     ::arrow::TimeUnit::type coerce_timestamps_unit_;
     bool truncated_timestamps_allowed_;
   };
 
-  bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; }
+  bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
 
   bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
   ::arrow::TimeUnit::type coerce_timestamps_unit() const {
@@ -105,12 +105,12 @@ class PARQUET_EXPORT ArrowWriterProperties {
                                  bool coerce_timestamps_enabled,
                                  ::arrow::TimeUnit::type coerce_timestamps_unit,
                                  bool truncated_timestamps_allowed)
-      : write_nanos_as_int96_(write_nanos_as_int96),
+      : write_timestamps_as_int96_(write_nanos_as_int96),
         coerce_timestamps_enabled_(coerce_timestamps_enabled),
         coerce_timestamps_unit_(coerce_timestamps_unit),
         truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
 
-  const bool write_nanos_as_int96_;
+  const bool write_timestamps_as_int96_;
   const bool coerce_timestamps_enabled_;
   const ::arrow::TimeUnit::type coerce_timestamps_unit_;
   const bool truncated_timestamps_allowed_;
@@ -208,24 +208,52 @@ namespace internal {
  * Timestamp conversion constants
  */
 constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
-constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000);
 
-/**
- * Converts nanosecond timestamps to Impala (Int96) format
- */
-inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
-                                         Int96* impala_timestamp) {
-  int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays;
+template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
+inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
+  int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
   (*impala_timestamp).value[2] = (uint32_t)julian_days;
 
-  int64_t last_day_nanos = nanoseconds % kNanosecondsPerDay;
+  int64_t last_day_units = time % UnitPerDay;
   int64_t* impala_last_day_nanos = reinterpret_cast<int64_t*>(impala_timestamp);
-  *impala_last_day_nanos = last_day_nanos;
+  *impala_last_day_nanos = last_day_units * NanosecondsPerUnit;
+}
+
+constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
+
+inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
+                                                                   impala_timestamp);
+}
+
+constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
+
+inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
+                                          Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
+      milliseconds, impala_timestamp);
+}
+
+constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
+
+inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
+                                          Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
+      microseconds, impala_timestamp);
+}
+
+constexpr int64_t kNanosecondsInNanos = INT64_C(1);
+
+inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
+                                         Int96* impala_timestamp) {
+  ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
+      nanoseconds, impala_timestamp);
 }
 
 }  // namespace internal
 
 }  // namespace arrow
+
 }  // namespace parquet
 
 #endif  // PARQUET_ARROW_WRITER_H
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index b277180..1812f55 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -175,6 +175,19 @@ struct FixedLenByteArray {
 
 using FLBA = FixedLenByteArray;
 
+// Julian day at unix epoch.
+//
+// The Julian Day Number (JDN) is the integer assigned to a whole solar day in
+// the Julian day count starting from noon Universal time, with Julian day
+// number 0 assigned to the day starting at noon on Monday, January 1, 4713 BC,
+// proleptic Julian calendar (November 24, 4714 BC, in the proleptic Gregorian
+// calendar),
+constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588);
+constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24);
+constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000);
+constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000);
+constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000);
+
 MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
 STRUCT_END(Int96, 12);
 
@@ -192,6 +205,14 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds)
   std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds));
 }
 
+static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
+  int64_t days_since_epoch = i96.value[2] - kJulianToUnixEpochDays;
+  int64_t nanoseconds = 0;
+
+  memcpy(&nanoseconds, &i96.value, sizeof(int64_t));
+  return days_since_epoch * kNanosecondsPerDay + nanoseconds;
+}
+
 static inline std::string Int96ToString(const Int96& a) {
   std::ostringstream result;
   std::copy(a.value, a.value + 3, std::ostream_iterator<uint32_t>(result, " "));
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index b89145a..feaa890 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -284,8 +284,8 @@ use_dictionary : bool or list
     Specify if we should use dictionary encoding in general or only for
     some columns.
 use_deprecated_int96_timestamps : boolean, default None
-    Write nanosecond resolution timestamps to INT96 Parquet
-    format. Defaults to False unless enabled by flavor argument
+    Write timestamps to INT96 Parquet format. Defaults to False unless enabled
+    by flavor argument. This take priority over the coerce_timestamps option.
 coerce_timestamps : string, default None
     Cast timestamps a particular resolution.
     Valid values: {None, 'ms', 'us'}
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 5c27a9b..82c80e9 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -844,7 +844,7 @@ def test_date_time_types():
     a2 = pa.array(data2, type=t2)
 
     t3 = pa.timestamp('us')
-    start = pd.Timestamp('2000-01-01').value / 1000
+    start = pd.Timestamp('2001-01-01').value / 1000
     data3 = np.array([start, start + 1, start + 2], dtype='int64')
     a3 = pa.array(data3, type=t3)
 
@@ -892,8 +892,9 @@ def test_date_time_types():
 
     # date64 as date32
     # time32[s] to time32[ms]
+    # 'timestamp[ms]' is saved as INT96 timestamp
     # 'timestamp[ns]' is saved as INT96 timestamp
-    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
+    expected = pa.Table.from_arrays([a1, a1, a7, a4, a5, ex_a6, a7],
                                     ['date32', 'date64', 'timestamp[us]',
                                      'time32[s]', 'time64[us]',
                                      'time32_from64[s]',