You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/19 21:15:35 UTC

[arrow] branch master updated: ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new d54425d  ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer
d54425d is described below

commit d54425de19b7dbb2764a40355d76d1c785cf64ec
Author: TP Boudreau <tp...@gmail.com>
AuthorDate: Wed Jun 19 16:15:17 2019 -0500

    ARROW-3729: [C++][Parquet] Use logical annotations in Arrow Parquet reader/writer
    
    This PR causes the Arrow Parquet I/O facility to recognize Parquet logical annotations (parquet.thrift LogicalType's) on read and to generate them (rather than converted types) on write.
    
    Since Parquet logical annotations include nanosecond timestamps, this patch satisfies the feature requested in ARROW-3729.
    
    Author: TP Boudreau <tp...@gmail.com>
    
    Closes #4421 from tpboudreau/ARROW-3729 and squashes the following commits:
    
    464f34817 <TP Boudreau> Fix failing python parquet tests
    4cef109d1 <TP Boudreau> Temporarily remove validity masks from test
    81dc74273 <TP Boudreau> Reintroduce Parquet version 1.0 behavior for timestamps
    fa0c4828f <TP Boudreau> Coerce Arrow second timestamps to Parquet millisecond timestamps by default
    be613c47c <TP Boudreau> Smallish code review fixes
    b5ebdbdb7 <TP Boudreau> Set Parquet isAdjustedToUTC to true for timestamps with non-empty timezones
    c5de42092 <TP Boudreau> Revert "Preserve Arrow timestamp timezones using Parquet file metadata"
    0336eee5f <TP Boudreau> Preserve Arrow timestamp timezones using Parquet file metadata
    f93794352 <TP Boudreau> Use logical annotations in Arrow Parquet reader/writer
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 407 ++++++++++++++++++----
 cpp/src/parquet/arrow/arrow-schema-test.cc        | 227 +++++++++++-
 cpp/src/parquet/arrow/reader.cc                   |   6 +-
 cpp/src/parquet/arrow/schema.cc                   | 363 ++++++++++++-------
 cpp/src/parquet/arrow/writer.cc                   | 121 ++++---
 cpp/src/parquet/types.cc                          |   3 +
 python/pyarrow/tests/test_parquet.py              | 182 +++++++---
 7 files changed, 1017 insertions(+), 292 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 5425e7e..83be7ed 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -87,58 +87,76 @@ static constexpr int LARGE_SIZE = 10000;
 
 static constexpr uint32_t kDefaultSeed = 0;
 
-LogicalType::type get_logical_type(const ::DataType& type) {
+std::shared_ptr<const LogicalAnnotation> get_logical_annotation(const ::DataType& type) {
   switch (type.id()) {
     case ArrowId::UINT8:
-      return LogicalType::UINT_8;
+      return LogicalAnnotation::Int(8, false);
     case ArrowId::INT8:
-      return LogicalType::INT_8;
+      return LogicalAnnotation::Int(8, true);
     case ArrowId::UINT16:
-      return LogicalType::UINT_16;
+      return LogicalAnnotation::Int(16, false);
     case ArrowId::INT16:
-      return LogicalType::INT_16;
+      return LogicalAnnotation::Int(16, true);
     case ArrowId::UINT32:
-      return LogicalType::UINT_32;
+      return LogicalAnnotation::Int(32, false);
     case ArrowId::INT32:
-      return LogicalType::INT_32;
+      return LogicalAnnotation::Int(32, true);
     case ArrowId::UINT64:
-      return LogicalType::UINT_64;
+      return LogicalAnnotation::Int(64, false);
     case ArrowId::INT64:
-      return LogicalType::INT_64;
+      return LogicalAnnotation::Int(64, true);
     case ArrowId::STRING:
-      return LogicalType::UTF8;
+      return LogicalAnnotation::String();
     case ArrowId::DATE32:
-      return LogicalType::DATE;
+      return LogicalAnnotation::Date();
     case ArrowId::DATE64:
-      return LogicalType::DATE;
+      return LogicalAnnotation::Date();
     case ArrowId::TIMESTAMP: {
       const auto& ts_type = static_cast<const ::arrow::TimestampType&>(type);
+      const bool adjusted_to_utc = !(ts_type.timezone().empty());
       switch (ts_type.unit()) {
         case TimeUnit::MILLI:
-          return LogicalType::TIMESTAMP_MILLIS;
+          return LogicalAnnotation::Timestamp(adjusted_to_utc,
+                                              LogicalAnnotation::TimeUnit::MILLIS);
         case TimeUnit::MICRO:
-          return LogicalType::TIMESTAMP_MICROS;
+          return LogicalAnnotation::Timestamp(adjusted_to_utc,
+                                              LogicalAnnotation::TimeUnit::MICROS);
+        case TimeUnit::NANO:
+          return LogicalAnnotation::Timestamp(adjusted_to_utc,
+                                              LogicalAnnotation::TimeUnit::NANOS);
         default:
-          DCHECK(false) << "Only MILLI and MICRO units supported for Arrow timestamps "
-                           "with Parquet.";
+          DCHECK(false)
+              << "Only MILLI, MICRO, and NANO units supported for Arrow TIMESTAMP.";
       }
       break;
     }
     case ArrowId::TIME32:
-      return LogicalType::TIME_MILLIS;
-    case ArrowId::TIME64:
-      return LogicalType::TIME_MICROS;
+      return LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS);
+    case ArrowId::TIME64: {
+      const auto& tm_type = static_cast<const ::arrow::TimeType&>(type);
+      switch (tm_type.unit()) {
+        case TimeUnit::MICRO:
+          return LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS);
+        case TimeUnit::NANO:
+          return LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS);
+        default:
+          DCHECK(false) << "Only MICRO and NANO units supported for Arrow TIME64.";
+      }
+      break;
+    }
     case ArrowId::DICTIONARY: {
       const ::arrow::DictionaryType& dict_type =
           static_cast<const ::arrow::DictionaryType&>(type);
-      return get_logical_type(*dict_type.value_type());
+      return get_logical_annotation(*dict_type.value_type());
+    }
+    case ArrowId::DECIMAL: {
+      const auto& dec_type = static_cast<const ::arrow::Decimal128Type&>(type);
+      return LogicalAnnotation::Decimal(dec_type.precision(), dec_type.scale());
     }
-    case ArrowId::DECIMAL:
-      return LogicalType::DECIMAL;
     default:
       break;
   }
-  return LogicalType::NONE;
+  return LogicalAnnotation::None();
 }
 
 ParquetType::type get_physical_type(const ::DataType& type) {
@@ -353,6 +371,49 @@ void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual
   }
 }
 
+void DoConfiguredRoundtrip(
+    const std::shared_ptr<Table>& table, int64_t row_group_size,
+    std::shared_ptr<Table>* out,
+    const std::shared_ptr<::parquet::WriterProperties>& parquet_properties =
+        ::parquet::default_writer_properties(),
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+        default_arrow_writer_properties()) {
+  std::shared_ptr<Buffer> buffer;
+
+  auto sink = CreateOutputStream();
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+                                row_group_size, parquet_properties, arrow_properties));
+  ASSERT_OK_NO_THROW(sink->Finish(&buffer));
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(),
+                              ::parquet::default_reader_properties(), nullptr, &reader));
+  ASSERT_OK_NO_THROW(reader->ReadTable(out));
+}
+
+void CheckConfiguredRoundtrip(
+    const std::shared_ptr<Table>& input_table,
+    const std::shared_ptr<Table>& expected_table = nullptr,
+    const std::shared_ptr<::parquet::WriterProperties>& parquet_properties =
+        ::parquet::default_writer_properties(),
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+        default_arrow_writer_properties()) {
+  std::shared_ptr<Table> actual_table;
+  ASSERT_NO_FATAL_FAILURE(DoConfiguredRoundtrip(input_table, input_table->num_rows(),
+                                                &actual_table, parquet_properties,
+                                                arrow_properties));
+  if (expected_table) {
+    ASSERT_NO_FATAL_FAILURE(
+        ::arrow::AssertSchemaEqual(*actual_table->schema(), *expected_table->schema()));
+    ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *expected_table));
+  } else {
+    ASSERT_NO_FATAL_FAILURE(
+        ::arrow::AssertSchemaEqual(*actual_table->schema(), *input_table->schema()));
+    ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*actual_table, *input_table));
+  }
+}
+
 void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,
                        int64_t row_group_size, const std::vector<int>& column_subset,
                        std::shared_ptr<Table>* out,
@@ -383,14 +444,14 @@ void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group
   std::shared_ptr<Table> result;
   DoSimpleRoundtrip(table, false /* use_threads */, row_group_size, {}, &result,
                     arrow_properties);
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*table->schema(), *result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result, false));
 }
 
 static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
                                                    Repetition::type repetition) {
   int32_t byte_width = -1;
-  int32_t precision = -1;
-  int32_t scale = -1;
 
   switch (type.id()) {
     case ::arrow::Type::DICTIONARY: {
@@ -404,9 +465,7 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
         case ::arrow::Type::DECIMAL: {
           const auto& decimal_type =
               static_cast<const ::arrow::Decimal128Type&>(values_type);
-          precision = decimal_type.precision();
-          scale = decimal_type.scale();
-          byte_width = DecimalSize(precision);
+          byte_width = DecimalSize(decimal_type.precision());
         } break;
         default:
           break;
@@ -417,15 +476,13 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
       break;
     case ::arrow::Type::DECIMAL: {
       const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
-      precision = decimal_type.precision();
-      scale = decimal_type.scale();
-      byte_width = DecimalSize(precision);
+      byte_width = DecimalSize(decimal_type.precision());
     } break;
     default:
       break;
   }
-  auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
-                                   get_logical_type(type), byte_width, precision, scale);
+  auto pnode = PrimitiveNode::Make("column1", repetition, get_logical_annotation(type),
+                                   get_physical_type(type), byte_width);
   NodePtr node_ =
       GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
   return std::static_pointer_cast<GroupNode>(node_);
@@ -1229,7 +1286,7 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
   ASSERT_NO_FATAL_FAILURE(this->CheckSingleColumnRequiredTableRead(4));
 }
 
-void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros = false) {
+void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool expected = false) {
   using ::arrow::ArrayFromVector;
 
   std::vector<bool> is_valid = {true, true, true, false, true, true};
@@ -1238,12 +1295,14 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros =
   auto f0 = field("f0", ::arrow::date32());
   auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
   auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
-  auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO;
-  auto f3 = field("f3", ::arrow::timestamp(f3_unit));
+  auto f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
+  auto f3_x = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
   auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
   auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
+  auto f6 = field("f6", ::arrow::time64(TimeUnit::NANO));
 
-  std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));
+  std::shared_ptr<::arrow::Schema> schema(
+      new ::arrow::Schema({f0, f1, f2, (expected ? f3_x : f3), f4, f5, f6}));
 
   std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
                                      1489272000, 1489272000, 1489273000};
@@ -1254,34 +1313,42 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros =
   std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
                                         1489272, 1489272, 1489273};
 
-  std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
+  std::shared_ptr<Array> a0, a1, a2, a3, a3_x, a4, a5, a6;
   ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
   ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
                                                    &a1);
   ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
                                                    &a2);
-  auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values;
-  ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_ns_values,
+                                                   &a3);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(f3_x->type(), is_valid, t64_us_values,
+                                                   &a3_x);
   ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
   ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);
+  ArrayFromVector<::arrow::Time64Type, int64_t>(f6->type(), is_valid, t64_ns_values, &a6);
 
   std::vector<std::shared_ptr<::arrow::Column>> columns = {
-      std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
-      std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
-      std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};
+      std::make_shared<Column>("f0", a0),
+      std::make_shared<Column>("f1", a1),
+      std::make_shared<Column>("f2", a2),
+      std::make_shared<Column>("f3", (expected ? a3_x : a3)),
+      std::make_shared<Column>("f4", a4),
+      std::make_shared<Column>("f5", a5),
+      std::make_shared<Column>("f6", a6)};
 
   *out = Table::Make(schema, columns);
 }
 
 TEST(TestArrowReadWrite, DateTimeTypes) {
   std::shared_ptr<Table> table, result;
-  MakeDateTimeTypesTable(&table);
 
-  // Cast nanaoseconds to microseconds and use INT64 physical type
+  MakeDateTimeTypesTable(&table);
   ASSERT_NO_FATAL_FAILURE(
       DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
-  MakeDateTimeTypesTable(&table, true);
 
+  MakeDateTimeTypesTable(&table, true);  // build expected result
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*table->schema(), *result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
 }
 
@@ -1334,6 +1401,8 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) {
       input, false /* use_threads */, input->num_rows(), {}, &result,
       ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
 
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*ex_result->schema(), *result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
 
   // Ensure enable_deprecated_int96_timestamps as precedence over
@@ -1345,6 +1414,8 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) {
                                                 ->coerce_timestamps(TimeUnit::MILLI)
                                                 ->build()));
 
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*ex_result->schema(), *result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
 }
 
@@ -1352,7 +1423,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
   using ::arrow::ArrayFromVector;
   using ::arrow::field;
 
-  // PARQUET-1078, coerce Arrow timestamps to either TIMESTAMP_MILLIS or TIMESTAMP_MICROS
   std::vector<bool> is_valid = {true, true, true, false, true, true};
 
   auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
@@ -1376,42 +1446,41 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
   ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
 
   // Input table, all data as is
-  auto s1 = std::shared_ptr<::arrow::Schema>(
-      new ::arrow::Schema({field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us),
-                           field("f_ns", t_ns)}));
+  auto s1 = ::arrow::schema(
+      {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)});
   auto input = Table::Make(
       s1,
       {std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms),
        std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)});
 
   // Result when coercing to milliseconds
-  auto s2 = std::shared_ptr<::arrow::Schema>(
-      new ::arrow::Schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms),
-                           field("f_ns", t_ms)}));
+  auto s2 = ::arrow::schema({field("f_s", t_ms), field("f_ms", t_ms), field("f_us", t_ms),
+                             field("f_ns", t_ms)});
   auto ex_milli_result = Table::Make(
       s2,
       {std::make_shared<Column>("f_s", a_ms), std::make_shared<Column>("f_ms", a_ms),
        std::make_shared<Column>("f_us", a_ms), std::make_shared<Column>("f_ns", a_ms)});
-
   std::shared_ptr<Table> milli_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
       input, false /* use_threads */, input->num_rows(), {}, &milli_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*ex_milli_result->schema(), *milli_result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));
 
   // Result when coercing to microseconds
-  auto s3 = std::shared_ptr<::arrow::Schema>(
-      new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
-                           field("f_ns", t_us)}));
+  auto s3 = ::arrow::schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
+                             field("f_ns", t_us)});
   auto ex_micro_result = Table::Make(
       s3,
       {std::make_shared<Column>("f_s", a_us), std::make_shared<Column>("f_ms", a_us),
        std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_us)});
-
   std::shared_ptr<Table> micro_result;
   ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
       input, false /* use_threads */, input->num_rows(), {}, &micro_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build()));
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*ex_micro_result->schema(), *micro_result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_micro_result, *micro_result));
 }
 
@@ -1442,10 +1511,10 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
   ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
   ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
 
-  auto s1 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_s", t_s)}));
-  auto s2 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ms", t_ms)}));
-  auto s3 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_us", t_us)}));
-  auto s4 = std::shared_ptr<::arrow::Schema>(new ::arrow::Schema({field("f_ns", t_ns)}));
+  auto s1 = ::arrow::schema({field("f_s", t_s)});
+  auto s2 = ::arrow::schema({field("f_ms", t_ms)});
+  auto s3 = ::arrow::schema({field("f_us", t_us)});
+  auto s4 = ::arrow::schema({field("f_ns", t_ns)});
 
   auto c1 = std::make_shared<Column>("f_s", a_s);
   auto c2 = std::make_shared<Column>("f_ms", a_ms);
@@ -1473,25 +1542,221 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
   ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
                                     default_writer_properties(), coerce_millis));
 
-  // OK to lose precision if we explicitly allow it
-  auto allow_truncation = (ArrowWriterProperties::Builder()
-                               .coerce_timestamps(TimeUnit::MILLI)
-                               ->allow_truncated_timestamps()
-                               ->build());
+  // OK to lose micros/nanos -> millis precision if we explicitly allow it
+  auto allow_truncation_to_millis = (ArrowWriterProperties::Builder()
+                                         .coerce_timestamps(TimeUnit::MILLI)
+                                         ->allow_truncated_timestamps()
+                                         ->build());
   ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
-                                default_writer_properties(), allow_truncation));
+                                default_writer_properties(), allow_truncation_to_millis));
   ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
-                                default_writer_properties(), allow_truncation));
+                                default_writer_properties(), allow_truncation_to_millis));
 
-  // OK to write micros to micros
+  // OK to write to micros
   auto coerce_micros =
       (ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
+  ASSERT_OK_NO_THROW(WriteTable(*t1, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), coerce_micros));
+  ASSERT_OK_NO_THROW(WriteTable(*t2, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), coerce_micros));
   ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
                                 default_writer_properties(), coerce_micros));
 
   // Loss of precision
   ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
                                     default_writer_properties(), coerce_micros));
+
+  // OK to lose nanos -> micros precision if we explicitly allow it
+  auto allow_truncation_to_micros = (ArrowWriterProperties::Builder()
+                                         .coerce_timestamps(TimeUnit::MICRO)
+                                         ->allow_truncated_timestamps()
+                                         ->build());
+  ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
+                                default_writer_properties(), allow_truncation_to_micros));
+}
+
+TEST(TestArrowReadWrite, ImplicitSecondToMillisecondTimestampCoercion) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+  using ::arrow::schema;
+
+  std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+
+  std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
+  std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
+                                    1489272000, 1489272000, 1489273000};
+
+  std::shared_ptr<Array> a_s, a_ms;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
+
+  auto si = schema({field("timestamp", t_s)});
+  auto sx = schema({field("timestamp", t_ms)});
+
+  auto ci = std::make_shared<Column>("timestamp", a_s);
+  auto cx = std::make_shared<Column>("timestamp", a_ms);
+
+  auto ti = Table::Make(si, {ci});  // input
+  auto tx = Table::Make(sx, {cx});  // expected output
+  std::shared_ptr<Table> to;        // actual output
+
+  // default properties (without explicit coercion instructions) used ...
+  ASSERT_NO_FATAL_FAILURE(
+      DoSimpleRoundtrip(ti, false /* use_threads */, ti->num_rows(), {}, &to));
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*tx->schema(), *to->schema()));
+  ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*tx, *to));
+}
+
+TEST(TestArrowReadWrite, ParquetVersionTimestampDifferences) {
+  using ::arrow::ArrayFromVector;
+  using ::arrow::field;
+  using ::arrow::schema;
+
+  auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+  auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+  auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+  auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+  const int N = 24;
+  int64_t instant = INT64_C(1262304000);  // 2010-01-01T00:00:00 seconds offset
+  std::vector<int64_t> d_s, d_ms, d_us, d_ns;
+  for (int i = 0; i < N; ++i) {
+    d_s.push_back(instant);
+    d_ms.push_back(instant * INT64_C(1000));
+    d_us.push_back(instant * INT64_C(1000000));
+    d_ns.push_back(instant * INT64_C(1000000000));
+    instant += 3600;
+  }
+
+  std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, d_s, &a_s);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, d_ms, &a_ms);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, d_us, &a_us);
+  ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, d_ns, &a_ns);
+
+  auto c_s = std::make_shared<Column>("ts:s", a_s);
+  auto c_ms = std::make_shared<Column>("ts:ms", a_ms);
+  auto c_us = std::make_shared<Column>("ts:us", a_us);
+  auto c_ns = std::make_shared<Column>("ts:ns", a_ns);
+
+  auto input_schema = schema({field("ts:s", t_s), field("ts:ms", t_ms),
+                              field("ts:us", t_us), field("ts:ns", t_ns)});
+  auto input_table = Table::Make(input_schema, {c_s, c_ms, c_us, c_ns});
+
+  auto parquet_version_1_properties = ::parquet::default_writer_properties();
+  auto parquet_version_2_properties = ::parquet::WriterProperties::Builder()
+                                          .version(ParquetVersion::PARQUET_2_0)
+                                          ->build();
+
+  {
+    // Using Parquet version 1.0 defaults, seconds should be coerced to milliseconds
+    // and nanoseconds should be coerced to microseconds
+    auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
+                                   field("ts:us", t_us), field("ts:ns", t_us)});
+    auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_us});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_1_properties));
+  }
+  {
+    // Using Parquet version 2.0 defaults, seconds should be coerced to milliseconds
+    // and nanoseconds should be retained
+    auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
+                                   field("ts:us", t_us), field("ts:ns", t_ns)});
+    auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_us, c_ns});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_2_properties));
+  }
+
+  auto arrow_coerce_to_seconds_properties =
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::SECOND)->build();
+  auto arrow_coerce_to_millis_properties =
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build();
+  auto arrow_coerce_to_micros_properties =
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build();
+  auto arrow_coerce_to_nanos_properties =
+      ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::NANO)->build();
+  {
+    // Neither Parquet version 1.0 nor 2.0 allow coercing to seconds
+    auto sink = CreateOutputStream();
+    std::shared_ptr<Table> actual_table;
+    ASSERT_RAISES(NotImplemented,
+                  WriteTable(*input_table, ::arrow::default_memory_pool(), sink,
+                             input_table->num_rows(), parquet_version_1_properties,
+                             arrow_coerce_to_seconds_properties));
+    ASSERT_RAISES(NotImplemented,
+                  WriteTable(*input_table, ::arrow::default_memory_pool(), sink,
+                             input_table->num_rows(), parquet_version_2_properties,
+                             arrow_coerce_to_seconds_properties));
+  }
+  {
+    // Using Parquet version 1.0, coercing to milliseconds or microseconds is allowed
+    auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
+                                   field("ts:us", t_ms), field("ts:ns", t_ms)});
+    auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_1_properties,
+                                                     arrow_coerce_to_millis_properties));
+
+    expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
+                              field("ts:us", t_us), field("ts:ns", t_us)});
+    expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_1_properties,
+                                                     arrow_coerce_to_micros_properties));
+  }
+  {
+    // Using Parquet version 2.0, coercing to milliseconds or microseconds is allowed
+    auto expected_schema = schema({field("ts:s", t_ms), field("ts:ms", t_ms),
+                                   field("ts:us", t_ms), field("ts:ns", t_ms)});
+    auto expected_table = Table::Make(expected_schema, {c_ms, c_ms, c_ms, c_ms});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_2_properties,
+                                                     arrow_coerce_to_millis_properties));
+
+    expected_schema = schema({field("ts:s", t_us), field("ts:ms", t_us),
+                              field("ts:us", t_us), field("ts:ns", t_us)});
+    expected_table = Table::Make(expected_schema, {c_us, c_us, c_us, c_us});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_2_properties,
+                                                     arrow_coerce_to_micros_properties));
+  }
+  {
+    // Using Parquet version 1.0, coercing to (int64) nanoseconds is not allowed
+    auto sink = CreateOutputStream();
+    std::shared_ptr<Table> actual_table;
+    ASSERT_RAISES(NotImplemented,
+                  WriteTable(*input_table, ::arrow::default_memory_pool(), sink,
+                             input_table->num_rows(), parquet_version_1_properties,
+                             arrow_coerce_to_nanos_properties));
+  }
+  {
+    // Using Parquet version 2.0, coercing to (int64) nanoseconds is allowed
+    auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
+                                   field("ts:us", t_ns), field("ts:ns", t_ns)});
+    auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_2_properties,
+                                                     arrow_coerce_to_nanos_properties));
+  }
+
+  auto arrow_enable_int96_properties =
+      ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build();
+  {
+    // For either Parquet version, coercing to nanoseconds is allowed if Int96
+    // storage is used
+    auto expected_schema = schema({field("ts:s", t_ns), field("ts:ms", t_ns),
+                                   field("ts:us", t_ns), field("ts:ns", t_ns)});
+    auto expected_table = Table::Make(expected_schema, {c_ns, c_ns, c_ns, c_ns});
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_1_properties,
+                                                     arrow_enable_int96_properties));
+    ASSERT_NO_FATAL_FAILURE(CheckConfiguredRoundtrip(input_table, expected_table,
+                                                     parquet_version_2_properties,
+                                                     arrow_enable_int96_properties));
+  }
 }
 
 TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
@@ -1549,6 +1814,8 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
   ASSERT_NO_FATAL_FAILURE(
       DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
 
+  ASSERT_NO_FATAL_FAILURE(
+      ::arrow::AssertSchemaEqual(*ex_table->schema(), *result->schema()));
   ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result));
 }
 
diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc
index b806782..cedabdb 100644
--- a/cpp/src/parquet/arrow/arrow-schema-test.cc
+++ b/cpp/src/parquet/arrow/arrow-schema-test.cc
@@ -33,6 +33,7 @@ using arrow::Field;
 using arrow::TimeUnit;
 
 using ParquetType = parquet::Type;
+using parquet::LogicalAnnotation;
 using parquet::LogicalType;
 using parquet::Repetition;
 using parquet::schema::GroupNode;
@@ -115,12 +116,14 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
   parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                LogicalType::TIMESTAMP_MILLIS));
-  arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+  arrow_fields.push_back(std::make_shared<Field>(
+      "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                LogicalType::TIMESTAMP_MICROS));
-  arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
+  arrow_fields.push_back(std::make_shared<Field>(
+      "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("date", Repetition::REQUIRED,
                                                ParquetType::INT32, LogicalType::DATE));
@@ -168,6 +171,103 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
 }
 
+TEST_F(TestConvertParquetSchema, ParquetAnnotatedFields) {
+  struct FieldConstructionArguments {
+    std::string name;
+    std::shared_ptr<const LogicalAnnotation> annotation;
+    parquet::Type::type physical_type;
+    int physical_length;
+    std::shared_ptr<::arrow::DataType> datatype;
+  };
+
+  std::vector<FieldConstructionArguments> cases = {
+      {"string", LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1,
+       ::arrow::utf8()},
+      {"enum", LogicalAnnotation::Enum(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
+      {"decimal(8, 2)", LogicalAnnotation::Decimal(8, 2), ParquetType::INT32, -1,
+       ::arrow::decimal(8, 2)},
+      {"decimal(16, 4)", LogicalAnnotation::Decimal(16, 4), ParquetType::INT64, -1,
+       ::arrow::decimal(16, 4)},
+      {"decimal(32, 8)", LogicalAnnotation::Decimal(32, 8),
+       ParquetType::FIXED_LEN_BYTE_ARRAY, 16, ::arrow::decimal(32, 8)},
+      {"date", LogicalAnnotation::Date(), ParquetType::INT32, -1, ::arrow::date32()},
+      {"time(ms)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)},
+      {"time(us)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)},
+      {"time(ns)", LogicalAnnotation::Time(true, LogicalAnnotation::TimeUnit::NANOS),
+       ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)},
+      {"time(ms)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT32, -1, ::arrow::time32(::arrow::TimeUnit::MILLI)},
+      {"time(us)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::MICRO)},
+      {"time(ns)", LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS),
+       ParquetType::INT64, -1, ::arrow::time64(::arrow::TimeUnit::NANO)},
+      {"timestamp(true, ms)",
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC")},
+      {"timestamp(true, us)",
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC")},
+      {"timestamp(true, ns)",
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::NANOS),
+       ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC")},
+      {"timestamp(false, ms)",
+       LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MILLI)},
+      {"timestamp(false, us)",
+       LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::MICRO)},
+      {"timestamp(false, ns)",
+       LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::NANOS),
+       ParquetType::INT64, -1, ::arrow::timestamp(::arrow::TimeUnit::NANO)},
+      {"int(8, false)", LogicalAnnotation::Int(8, false), ParquetType::INT32, -1,
+       ::arrow::uint8()},
+      {"int(8, true)", LogicalAnnotation::Int(8, true), ParquetType::INT32, -1,
+       ::arrow::int8()},
+      {"int(16, false)", LogicalAnnotation::Int(16, false), ParquetType::INT32, -1,
+       ::arrow::uint16()},
+      {"int(16, true)", LogicalAnnotation::Int(16, true), ParquetType::INT32, -1,
+       ::arrow::int16()},
+      {"int(32, false)", LogicalAnnotation::Int(32, false), ParquetType::INT32, -1,
+       ::arrow::uint32()},
+      {"int(32, true)", LogicalAnnotation::Int(32, true), ParquetType::INT32, -1,
+       ::arrow::int32()},
+      {"int(64, false)", LogicalAnnotation::Int(64, false), ParquetType::INT64, -1,
+       ::arrow::uint64()},
+      {"int(64, true)", LogicalAnnotation::Int(64, true), ParquetType::INT64, -1,
+       ::arrow::int64()},
+      {"json", LogicalAnnotation::JSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
+      {"bson", LogicalAnnotation::BSON(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
+      {"interval", LogicalAnnotation::Interval(), ParquetType::FIXED_LEN_BYTE_ARRAY, 12,
+       ::arrow::fixed_size_binary(12)},
+      {"uuid", LogicalAnnotation::UUID(), ParquetType::FIXED_LEN_BYTE_ARRAY, 16,
+       ::arrow::fixed_size_binary(16)},
+      {"none", LogicalAnnotation::None(), ParquetType::BOOLEAN, -1, ::arrow::boolean()},
+      {"none", LogicalAnnotation::None(), ParquetType::INT32, -1, ::arrow::int32()},
+      {"none", LogicalAnnotation::None(), ParquetType::INT64, -1, ::arrow::int64()},
+      {"none", LogicalAnnotation::None(), ParquetType::FLOAT, -1, ::arrow::float32()},
+      {"none", LogicalAnnotation::None(), ParquetType::DOUBLE, -1, ::arrow::float64()},
+      {"none", LogicalAnnotation::None(), ParquetType::BYTE_ARRAY, -1, ::arrow::binary()},
+      {"none", LogicalAnnotation::None(), ParquetType::FIXED_LEN_BYTE_ARRAY, 64,
+       ::arrow::fixed_size_binary(64)},
+      {"null", LogicalAnnotation::Null(), ParquetType::BYTE_ARRAY, -1, ::arrow::null()},
+  };
+
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  for (const FieldConstructionArguments& c : cases) {
+    parquet_fields.push_back(PrimitiveNode::Make(
+        c.name, Repetition::OPTIONAL, c.annotation, c.physical_type, c.physical_length));
+    arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype));
+  }
+
+  ASSERT_OK(ConvertSchema(parquet_fields));
+  auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+  ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
+}
+
 TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
@@ -586,6 +686,7 @@ TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) {
 
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(arrow_schema));
 }
+
 TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
@@ -686,12 +787,14 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
   parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                LogicalType::TIMESTAMP_MILLIS));
-  arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
+  arrow_fields.push_back(std::make_shared<Field>(
+      "timestamp", ::arrow::timestamp(TimeUnit::MILLI, "UTC"), false));
 
   parquet_fields.push_back(PrimitiveNode::Make("timestamp[us]", Repetition::REQUIRED,
                                                ParquetType::INT64,
                                                LogicalType::TIMESTAMP_MICROS));
-  arrow_fields.push_back(std::make_shared<Field>("timestamp[us]", TIMESTAMP_US, false));
+  arrow_fields.push_back(std::make_shared<Field>(
+      "timestamp[us]", ::arrow::timestamp(TimeUnit::MICRO, "UTC"), false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
@@ -714,6 +817,113 @@ TEST_F(TestConvertArrowSchema, ParquetFlatPrimitives) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
 }
 
+TEST_F(TestConvertArrowSchema, ArrowFields) {
+  struct FieldConstructionArguments {
+    std::string name;
+    std::shared_ptr<::arrow::DataType> datatype;
+    std::shared_ptr<const LogicalAnnotation> annotation;
+    parquet::Type::type physical_type;
+    int physical_length;
+  };
+
+  std::vector<FieldConstructionArguments> cases = {
+      {"boolean", ::arrow::boolean(), LogicalAnnotation::None(), ParquetType::BOOLEAN,
+       -1},
+      {"binary", ::arrow::binary(), LogicalAnnotation::None(), ParquetType::BYTE_ARRAY,
+       -1},
+      {"fixed_size_binary", ::arrow::fixed_size_binary(64), LogicalAnnotation::None(),
+       ParquetType::FIXED_LEN_BYTE_ARRAY, 64},
+      {"uint8", ::arrow::uint8(), LogicalAnnotation::Int(8, false), ParquetType::INT32,
+       -1},
+      {"int8", ::arrow::int8(), LogicalAnnotation::Int(8, true), ParquetType::INT32, -1},
+      {"uint16", ::arrow::uint16(), LogicalAnnotation::Int(16, false), ParquetType::INT32,
+       -1},
+      {"int16", ::arrow::int16(), LogicalAnnotation::Int(16, true), ParquetType::INT32,
+       -1},
+      {"uint32", ::arrow::uint32(), LogicalAnnotation::None(), ParquetType::INT64,
+       -1},  // Parquet 1.0
+      {"int32", ::arrow::int32(), LogicalAnnotation::None(), ParquetType::INT32, -1},
+      {"uint64", ::arrow::uint64(), LogicalAnnotation::Int(64, false), ParquetType::INT64,
+       -1},
+      {"int64", ::arrow::int64(), LogicalAnnotation::None(), ParquetType::INT64, -1},
+      {"float32", ::arrow::float32(), LogicalAnnotation::None(), ParquetType::FLOAT, -1},
+      {"float64", ::arrow::float64(), LogicalAnnotation::None(), ParquetType::DOUBLE, -1},
+      {"utf8", ::arrow::utf8(), LogicalAnnotation::String(), ParquetType::BYTE_ARRAY, -1},
+      {"decimal(1, 0)", ::arrow::decimal(1, 0), LogicalAnnotation::Decimal(1, 0),
+       ParquetType::FIXED_LEN_BYTE_ARRAY, 1},
+      {"decimal(8, 2)", ::arrow::decimal(8, 2), LogicalAnnotation::Decimal(8, 2),
+       ParquetType::FIXED_LEN_BYTE_ARRAY, 4},
+      {"decimal(16, 4)", ::arrow::decimal(16, 4), LogicalAnnotation::Decimal(16, 4),
+       ParquetType::FIXED_LEN_BYTE_ARRAY, 7},
+      {"decimal(32, 8)", ::arrow::decimal(32, 8), LogicalAnnotation::Decimal(32, 8),
+       ParquetType::FIXED_LEN_BYTE_ARRAY, 14},
+      {"time32", ::arrow::time32(::arrow::TimeUnit::MILLI),
+       LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT32, -1},
+      {"time64(microsecond)", ::arrow::time64(::arrow::TimeUnit::MICRO),
+       LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
+       LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS),
+       ParquetType::INT64, -1},
+      {"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
+       LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT64, -1},
+      {"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO),
+       LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
+       LogicalAnnotation::Timestamp(false, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT64, -1},
+      {"timestamp(microsecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "UTC"),
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"timestamp(nanosecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "UTC"),
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"timestamp(millisecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "CET"),
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MILLIS),
+       ParquetType::INT64, -1},
+      {"timestamp(microsecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::MICRO, "CET"),
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"timestamp(nanosecond, CET)", ::arrow::timestamp(::arrow::TimeUnit::NANO, "CET"),
+       LogicalAnnotation::Timestamp(true, LogicalAnnotation::TimeUnit::MICROS),
+       ParquetType::INT64, -1},
+      {"null", ::arrow::null(), LogicalAnnotation::Null(), ParquetType::INT32, -1}};
+
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+  std::vector<NodePtr> parquet_fields;
+
+  for (const FieldConstructionArguments& c : cases) {
+    arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false));
+    parquet_fields.push_back(PrimitiveNode::Make(
+        c.name, Repetition::REQUIRED, c.annotation, c.physical_type, c.physical_length));
+  }
+
+  ASSERT_OK(ConvertSchema(arrow_fields));
+  ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
+}
+
+TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
+  struct FieldConstructionArguments {
+    std::string name;
+    std::shared_ptr<::arrow::DataType> datatype;
+  };
+
+  std::vector<FieldConstructionArguments> cases = {
+      {"float16", ::arrow::float16()},
+  };
+
+  for (const FieldConstructionArguments& c : cases) {
+    auto field = std::make_shared<Field>(c.name, c.datatype);
+    ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
+  }
+}
+
 TEST_F(TestConvertArrowSchema, ParquetFlatPrimitivesAsDictionaries) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
@@ -809,15 +1019,6 @@ TEST_F(TestConvertArrowSchema, ParquetLists) {
   ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
 }
 
-TEST_F(TestConvertArrowSchema, UnsupportedTypes) {
-  std::vector<std::shared_ptr<Field>> unsupported_fields = {
-      ::arrow::field("f0", ::arrow::time64(TimeUnit::NANO))};
-
-  for (const auto& field : unsupported_fields) {
-    ASSERT_RAISES(NotImplemented, ConvertSchema({field}));
-  }
-}
-
 TEST_F(TestConvertArrowSchema, ParquetFlatDecimals) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index bdff716..5665603 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -1613,7 +1613,11 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read,
           TRANSFER_DATA(::arrow::TimestampType, Int64Type);
         } break;
         case ::arrow::TimeUnit::NANO: {
-          TRANSFER_DATA(::arrow::TimestampType, Int96Type);
+          if (descr_->physical_type() == ::parquet::Type::INT96) {
+            TRANSFER_DATA(::arrow::TimestampType, Int96Type);
+          } else {
+            TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+          }
         } break;
         default:
           return Status::NotImplemented("TimeUnit not supported");
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 45b4b38..22b8297 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -17,6 +17,7 @@
 
 #include "parquet/arrow/schema.h"
 
+#include <algorithm>
 #include <string>
 #include <unordered_set>
 #include <utility>
@@ -25,6 +26,7 @@
 #include "arrow/array.h"
 #include "arrow/status.h"
 #include "arrow/type.h"
+#include "arrow/util/checked_cast.h"
 #include "arrow/util/logging.h"
 
 #include "parquet/arrow/writer.h"
@@ -35,6 +37,7 @@
 
 using arrow::Field;
 using arrow::Status;
+using arrow::internal::checked_cast;
 
 using ArrowType = arrow::DataType;
 using ArrowTypeId = arrow::Type;
@@ -46,6 +49,7 @@ using parquet::schema::NodePtr;
 using parquet::schema::PrimitiveNode;
 
 using ParquetType = parquet::Type;
+using parquet::LogicalAnnotation;
 using parquet::LogicalType;
 
 namespace parquet {
@@ -56,117 +60,201 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
 const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
 
-std::shared_ptr<ArrowType> MakeDecimal128Type(const PrimitiveNode& node) {
-  const auto& metadata = node.decimal_metadata();
-  return ::arrow::decimal(metadata.precision, metadata.scale);
+static Status MakeArrowDecimal(const LogicalAnnotation& annotation,
+                               std::shared_ptr<ArrowType>* out) {
+  const auto& decimal = checked_cast<const DecimalAnnotation&>(annotation);
+  *out = ::arrow::decimal(decimal.precision(), decimal.scale());
+  return Status::OK();
 }
 
-static Status FromByteArray(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) {
-  switch (node.logical_type()) {
-    case LogicalType::UTF8:
-      *out = ::arrow::utf8();
+static Status MakeArrowInt(const LogicalAnnotation& annotation,
+                           std::shared_ptr<ArrowType>* out) {
+  const auto& integer = checked_cast<const IntAnnotation&>(annotation);
+  switch (integer.bit_width()) {
+    case 8:
+      *out = integer.is_signed() ? ::arrow::int8() : ::arrow::uint8();
       break;
-    case LogicalType::DECIMAL:
-      *out = MakeDecimal128Type(node);
+    case 16:
+      *out = integer.is_signed() ? ::arrow::int16() : ::arrow::uint16();
       break;
-    default:
-      // BINARY
-      *out = ::arrow::binary();
+    case 32:
+      *out = integer.is_signed() ? ::arrow::int32() : ::arrow::uint32();
       break;
+    default:
+      return Status::TypeError(annotation.ToString(),
+                               " can not annotate physical type Int32");
   }
   return Status::OK();
 }
 
-static Status FromFLBA(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) {
-  switch (node.logical_type()) {
-    case LogicalType::NONE:
-      *out = ::arrow::fixed_size_binary(node.type_length());
+static Status MakeArrowInt64(const LogicalAnnotation& annotation,
+                             std::shared_ptr<ArrowType>* out) {
+  const auto& integer = checked_cast<const IntAnnotation&>(annotation);
+  switch (integer.bit_width()) {
+    case 64:
+      *out = integer.is_signed() ? ::arrow::int64() : ::arrow::uint64();
       break;
-    case LogicalType::DECIMAL:
-      *out = MakeDecimal128Type(node);
+    default:
+      return Status::TypeError(annotation.ToString(),
+                               " can not annotate physical type Int64");
+  }
+  return Status::OK();
+}
+
+static Status MakeArrowTime32(const LogicalAnnotation& annotation,
+                              std::shared_ptr<ArrowType>* out) {
+  const auto& time = checked_cast<const TimeAnnotation&>(annotation);
+  switch (time.time_unit()) {
+    case LogicalAnnotation::TimeUnit::MILLIS:
+      *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
       break;
     default:
-      return Status::NotImplemented("Unhandled logical type ",
-                                    LogicalTypeToString(node.logical_type()),
-                                    " for fixed-length binary array");
+      return Status::TypeError(annotation.ToString(),
+                               " can not annotate physical type Time32");
   }
+  return Status::OK();
+}
 
+static Status MakeArrowTime64(const LogicalAnnotation& annotation,
+                              std::shared_ptr<ArrowType>* out) {
+  const auto& time = checked_cast<const TimeAnnotation&>(annotation);
+  switch (time.time_unit()) {
+    case LogicalAnnotation::TimeUnit::MICROS:
+      *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
+      break;
+    case LogicalAnnotation::TimeUnit::NANOS:
+      *out = ::arrow::time64(::arrow::TimeUnit::NANO);
+      break;
+    default:
+      return Status::TypeError(annotation.ToString(),
+                               " can not annotate physical type Time64");
+  }
   return Status::OK();
 }
 
-static Status FromInt32(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) {
-  switch (node.logical_type()) {
-    case LogicalType::NONE:
-      *out = ::arrow::int32();
+static Status MakeArrowTimestamp(const LogicalAnnotation& annotation,
+                                 std::shared_ptr<ArrowType>* out) {
+  static const char* utc = "UTC";
+  const auto& timestamp = checked_cast<const TimestampAnnotation&>(annotation);
+  switch (timestamp.time_unit()) {
+    case LogicalAnnotation::TimeUnit::MILLIS:
+      *out = (timestamp.is_adjusted_to_utc()
+                  ? ::arrow::timestamp(::arrow::TimeUnit::MILLI, utc)
+                  : ::arrow::timestamp(::arrow::TimeUnit::MILLI));
+      break;
+    case LogicalAnnotation::TimeUnit::MICROS:
+      *out = (timestamp.is_adjusted_to_utc()
+                  ? ::arrow::timestamp(::arrow::TimeUnit::MICRO, utc)
+                  : ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+      break;
+    case LogicalAnnotation::TimeUnit::NANOS:
+      *out = (timestamp.is_adjusted_to_utc()
+                  ? ::arrow::timestamp(::arrow::TimeUnit::NANO, utc)
+                  : ::arrow::timestamp(::arrow::TimeUnit::NANO));
       break;
-    case LogicalType::UINT_8:
-      *out = ::arrow::uint8();
+    default:
+      return Status::TypeError("Unrecognized time unit in timestamp annotation: ",
+                               annotation.ToString());
+  }
+  return Status::OK();
+}
+
+static Status FromByteArray(const LogicalAnnotation& annotation,
+                            std::shared_ptr<ArrowType>* out) {
+  switch (annotation.type()) {
+    case LogicalAnnotation::Type::STRING:
+      *out = ::arrow::utf8();
       break;
-    case LogicalType::INT_8:
-      *out = ::arrow::int8();
+    case LogicalAnnotation::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(annotation, out));
       break;
-    case LogicalType::UINT_16:
-      *out = ::arrow::uint16();
+    case LogicalAnnotation::Type::NONE:
+    case LogicalAnnotation::Type::ENUM:
+    case LogicalAnnotation::Type::JSON:
+    case LogicalAnnotation::Type::BSON:
+      *out = ::arrow::binary();
       break;
-    case LogicalType::INT_16:
-      *out = ::arrow::int16();
+    default:
+      return Status::NotImplemented("Unhandled logical annotation ",
+                                    annotation.ToString(), " for binary array");
+  }
+  return Status::OK();
+}
+
+static Status FromFLBA(const LogicalAnnotation& annotation, int32_t physical_length,
+                       std::shared_ptr<ArrowType>* out) {
+  switch (annotation.type()) {
+    case LogicalAnnotation::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(annotation, out));
       break;
-    case LogicalType::INT_32:
-      *out = ::arrow::int32();
+    case LogicalAnnotation::Type::NONE:
+    case LogicalAnnotation::Type::INTERVAL:
+    case LogicalAnnotation::Type::UUID:
+      *out = ::arrow::fixed_size_binary(physical_length);
       break;
-    case LogicalType::UINT_32:
-      *out = ::arrow::uint32();
+    default:
+      return Status::NotImplemented("Unhandled logical annotation ",
+                                    annotation.ToString(),
+                                    " for fixed-length binary array");
+  }
+
+  return Status::OK();
+}
+
+static Status FromInt32(const LogicalAnnotation& annotation,
+                        std::shared_ptr<ArrowType>* out) {
+  switch (annotation.type()) {
+    case LogicalAnnotation::Type::INT:
+      RETURN_NOT_OK(MakeArrowInt(annotation, out));
       break;
-    case LogicalType::DATE:
+    case LogicalAnnotation::Type::DATE:
       *out = ::arrow::date32();
       break;
-    case LogicalType::TIME_MILLIS:
-      *out = ::arrow::time32(::arrow::TimeUnit::MILLI);
+    case LogicalAnnotation::Type::TIME:
+      RETURN_NOT_OK(MakeArrowTime32(annotation, out));
+      break;
+    case LogicalAnnotation::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(annotation, out));
       break;
-    case LogicalType::DECIMAL:
-      *out = MakeDecimal128Type(node);
+    case LogicalAnnotation::Type::NONE:
+      *out = ::arrow::int32();
       break;
     default:
-      return Status::NotImplemented("Unhandled logical type ",
-                                    LogicalTypeToString(node.logical_type()),
+      return Status::NotImplemented("Unhandled logical type ", annotation.ToString(),
                                     " for INT32");
   }
   return Status::OK();
 }
 
-static Status FromInt64(const PrimitiveNode& node, std::shared_ptr<ArrowType>* out) {
-  switch (node.logical_type()) {
-    case LogicalType::NONE:
-      *out = ::arrow::int64();
-      break;
-    case LogicalType::INT_64:
-      *out = ::arrow::int64();
-      break;
-    case LogicalType::UINT_64:
-      *out = ::arrow::uint64();
+static Status FromInt64(const LogicalAnnotation& annotation,
+                        std::shared_ptr<ArrowType>* out) {
+  switch (annotation.type()) {
+    case LogicalAnnotation::Type::INT:
+      RETURN_NOT_OK(MakeArrowInt64(annotation, out));
       break;
-    case LogicalType::DECIMAL:
-      *out = MakeDecimal128Type(node);
+    case LogicalAnnotation::Type::DECIMAL:
+      RETURN_NOT_OK(MakeArrowDecimal(annotation, out));
       break;
-    case LogicalType::TIMESTAMP_MILLIS:
-      *out = TIMESTAMP_MS;
+    case LogicalAnnotation::Type::TIMESTAMP:
+      RETURN_NOT_OK(MakeArrowTimestamp(annotation, out));
       break;
-    case LogicalType::TIMESTAMP_MICROS:
-      *out = TIMESTAMP_US;
+    case LogicalAnnotation::Type::TIME:
+      RETURN_NOT_OK(MakeArrowTime64(annotation, out));
       break;
-    case LogicalType::TIME_MICROS:
-      *out = ::arrow::time64(::arrow::TimeUnit::MICRO);
+    case LogicalAnnotation::Type::NONE:
+      *out = ::arrow::int64();
       break;
     default:
-      return Status::NotImplemented("Unhandled logical type ",
-                                    LogicalTypeToString(node.logical_type()),
+      return Status::NotImplemented("Unhandled logical type ", annotation.ToString(),
                                     " for INT64");
   }
   return Status::OK();
 }
 
 Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>* out) {
-  if (primitive.logical_type() == LogicalType::NA) {
+  const std::shared_ptr<const LogicalAnnotation>& annotation =
+      primitive.logical_annotation();
+  if (annotation->is_invalid() || annotation->is_null()) {
     *out = ::arrow::null();
     return Status::OK();
   }
@@ -176,10 +264,10 @@ Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>*
       *out = ::arrow::boolean();
       break;
     case ParquetType::INT32:
-      RETURN_NOT_OK(FromInt32(primitive, out));
+      RETURN_NOT_OK(FromInt32(*annotation, out));
       break;
     case ParquetType::INT64:
-      RETURN_NOT_OK(FromInt64(primitive, out));
+      RETURN_NOT_OK(FromInt64(*annotation, out));
       break;
     case ParquetType::INT96:
       *out = TIMESTAMP_NS;
@@ -191,10 +279,10 @@ Status FromPrimitive(const PrimitiveNode& primitive, std::shared_ptr<ArrowType>*
       *out = ::arrow::float64();
       break;
     case ParquetType::BYTE_ARRAY:
-      RETURN_NOT_OK(FromByteArray(primitive, out));
+      RETURN_NOT_OK(FromByteArray(*annotation, out));
       break;
     case ParquetType::FIXED_LEN_BYTE_ARRAY:
-      RETURN_NOT_OK(FromFLBA(primitive, out));
+      RETURN_NOT_OK(FromFLBA(*annotation, primitive.type_length(), out));
       break;
     default: {
       // PARQUET-1565: This can occur if the file is corrupt
@@ -321,7 +409,7 @@ Status NodeToFieldInternal(const Node& node,
     }
   } else if (node.is_group()) {
     const auto& group = static_cast<const GroupNode&>(node);
-    if (node.logical_type() == LogicalType::LIST) {
+    if (node.logical_annotation()->is_list()) {
       RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
     } else {
       RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
@@ -411,7 +499,7 @@ Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::str
   RETURN_NOT_OK(FieldToNode(type->value_field(), properties, arrow_properties, &element));
 
   NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
-  *out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST);
+  *out = GroupNode::Make(name, repetition, {list}, LogicalAnnotation::List());
   return Status::OK();
 }
 
@@ -431,71 +519,96 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
   return Status::OK();
 }
 
-static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) {
+static std::shared_ptr<const LogicalAnnotation> TimestampAnnotationFromArrowTimestamp(
+    const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) {
+  const bool utc = !(timestamp_type.timezone().empty());
   switch (time_unit) {
     case ::arrow::TimeUnit::MILLI:
-      return LogicalType::TIMESTAMP_MILLIS;
+      return LogicalAnnotation::Timestamp(utc, LogicalAnnotation::TimeUnit::MILLIS);
     case ::arrow::TimeUnit::MICRO:
-      return LogicalType::TIMESTAMP_MICROS;
-    case ::arrow::TimeUnit::SECOND:
+      return LogicalAnnotation::Timestamp(utc, LogicalAnnotation::TimeUnit::MICROS);
     case ::arrow::TimeUnit::NANO:
+      return LogicalAnnotation::Timestamp(utc, LogicalAnnotation::TimeUnit::NANOS);
+    case ::arrow::TimeUnit::SECOND:
       // No equivalent parquet logical type.
       break;
   }
-
-  return LogicalType::NONE;
+  return LogicalAnnotation::None();
 }
 
 static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
-                                   const ArrowWriterProperties& properties,
+                                   const WriterProperties& properties,
+                                   const ArrowWriterProperties& arrow_properties,
                                    ParquetType::type* physical_type,
-                                   LogicalType::type* logical_type) {
-  const bool coerce = properties.coerce_timestamps_enabled();
-  const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit();
+                                   std::shared_ptr<const LogicalAnnotation>* annotation) {
+  const bool coerce = arrow_properties.coerce_timestamps_enabled();
+  const auto target_unit =
+      coerce ? arrow_properties.coerce_timestamps_unit() : type.unit();
 
   // The user is explicitly asking for Impala int96 encoding, there is no
   // logical type.
-  if (properties.support_deprecated_int96_timestamps()) {
+  if (arrow_properties.support_deprecated_int96_timestamps()) {
     *physical_type = ParquetType::INT96;
     return Status::OK();
   }
 
   *physical_type = ParquetType::INT64;
-  *logical_type = LogicalTypeFromArrowTimeUnit(unit);
+  *annotation = TimestampAnnotationFromArrowTimestamp(type, target_unit);
 
-  // The user is requesting that all timestamp columns are casted to a specific
-  // type. Only 2 TimeUnit are supported by arrow-parquet.
+  // The user is explicitly asking for timestamp data to be converted to the
+  // specified units (target_unit).
   if (coerce) {
-    switch (unit) {
-      case ::arrow::TimeUnit::MILLI:
-      case ::arrow::TimeUnit::MICRO:
-        break;
-      case ::arrow::TimeUnit::NANO:
-      case ::arrow::TimeUnit::SECOND:
-        return Status::NotImplemented(
-            "Can only coerce Arrow timestamps to milliseconds"
-            " or microseconds");
+    if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
+      switch (target_unit) {
+        case ::arrow::TimeUnit::MILLI:
+        case ::arrow::TimeUnit::MICRO:
+          break;
+        case ::arrow::TimeUnit::NANO:
+        case ::arrow::TimeUnit::SECOND:
+          return Status::NotImplemented(
+              "For Parquet version 1.0 files, can only coerce Arrow timestamps to "
+              "milliseconds or microseconds");
+      }
+    } else {
+      switch (target_unit) {
+        case ::arrow::TimeUnit::MILLI:
+        case ::arrow::TimeUnit::MICRO:
+        case ::arrow::TimeUnit::NANO:
+          break;
+        case ::arrow::TimeUnit::SECOND:
+          return Status::NotImplemented(
+              "For Parquet files, can only coerce Arrow timestamps to milliseconds, "
+              "microseconds, or nanoseconds");
+      }
     }
+    return Status::OK();
+  }
 
+  // The user implicitly wants timestamp data to retain its original time units,
+  // however the ConvertedType field used to indicate logical types for Parquet
+  // version 1.0 fields does not allow for nanosecond time units and so nanoseconds
+  // must be coerced to microseconds.
+  if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0 &&
+      type.unit() == ::arrow::TimeUnit::NANO) {
+    *annotation = TimestampAnnotationFromArrowTimestamp(type, ::arrow::TimeUnit::MICRO);
     return Status::OK();
   }
 
-  // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to
-  // int64 microseconds when deprecated int96 is not requested.
-  if (type.unit() == ::arrow::TimeUnit::NANO)
-    *logical_type = LogicalType::TIMESTAMP_MICROS;
-  else if (type.unit() == ::arrow::TimeUnit::SECOND)
-    return Status::NotImplemented(
-        "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
-        "Parquet.");
+  // The user implicitly wants timestamp data to retain its original time units,
+  // however the Arrow seconds time unit can not be represented (annotated) in
+  // any version of Parquet and so must be coerced to milliseconds.
+  if (type.unit() == ::arrow::TimeUnit::SECOND) {
+    *annotation = TimestampAnnotationFromArrowTimestamp(type, ::arrow::TimeUnit::MILLI);
+    return Status::OK();
+  }
 
   return Status::OK();
-}  // namespace arrow
+}
 
 Status FieldToNode(const std::shared_ptr<Field>& field,
                    const WriterProperties& properties,
                    const ArrowWriterProperties& arrow_properties, NodePtr* out) {
-  LogicalType::type logical_type = LogicalType::NONE;
+  std::shared_ptr<const LogicalAnnotation> annotation = LogicalAnnotation::None();
   ParquetType::type type;
   Repetition::type repetition =
       field->nullable() ? Repetition::OPTIONAL : Repetition::REQUIRED;
@@ -507,33 +620,33 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
   switch (field->type()->id()) {
     case ArrowTypeId::NA:
       type = ParquetType::INT32;
-      logical_type = LogicalType::NA;
+      annotation = LogicalAnnotation::Null();
       break;
     case ArrowTypeId::BOOL:
       type = ParquetType::BOOLEAN;
       break;
     case ArrowTypeId::UINT8:
       type = ParquetType::INT32;
-      logical_type = LogicalType::UINT_8;
+      annotation = LogicalAnnotation::Int(8, false);
       break;
     case ArrowTypeId::INT8:
       type = ParquetType::INT32;
-      logical_type = LogicalType::INT_8;
+      annotation = LogicalAnnotation::Int(8, true);
       break;
     case ArrowTypeId::UINT16:
       type = ParquetType::INT32;
-      logical_type = LogicalType::UINT_16;
+      annotation = LogicalAnnotation::Int(16, false);
       break;
     case ArrowTypeId::INT16:
       type = ParquetType::INT32;
-      logical_type = LogicalType::INT_16;
+      annotation = LogicalAnnotation::Int(16, true);
       break;
     case ArrowTypeId::UINT32:
       if (properties.version() == ::parquet::ParquetVersion::PARQUET_1_0) {
         type = ParquetType::INT64;
       } else {
         type = ParquetType::INT32;
-        logical_type = LogicalType::UINT_32;
+        annotation = LogicalAnnotation::Int(32, false);
       }
       break;
     case ArrowTypeId::INT32:
@@ -541,7 +654,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       break;
     case ArrowTypeId::UINT64:
       type = ParquetType::INT64;
-      logical_type = LogicalType::UINT_64;
+      annotation = LogicalAnnotation::Int(64, false);
       break;
     case ArrowTypeId::INT64:
       type = ParquetType::INT64;
@@ -554,7 +667,7 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
       break;
     case ArrowTypeId::STRING:
       type = ParquetType::BYTE_ARRAY;
-      logical_type = LogicalType::UTF8;
+      annotation = LogicalAnnotation::String();
       break;
     case ArrowTypeId::BINARY:
       type = ParquetType::BYTE_ARRAY;
@@ -567,37 +680,38 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
     } break;
     case ArrowTypeId::DECIMAL: {
       type = ParquetType::FIXED_LEN_BYTE_ARRAY;
-      logical_type = LogicalType::DECIMAL;
       const auto& decimal_type =
           static_cast<const ::arrow::Decimal128Type&>(*field->type());
       precision = decimal_type.precision();
       scale = decimal_type.scale();
       length = DecimalSize(precision);
+      PARQUET_CATCH_NOT_OK(annotation = LogicalAnnotation::Decimal(precision, scale));
     } break;
     case ArrowTypeId::DATE32:
       type = ParquetType::INT32;
-      logical_type = LogicalType::DATE;
+      annotation = LogicalAnnotation::Date();
       break;
     case ArrowTypeId::DATE64:
       type = ParquetType::INT32;
-      logical_type = LogicalType::DATE;
+      annotation = LogicalAnnotation::Date();
       break;
     case ArrowTypeId::TIMESTAMP:
       RETURN_NOT_OK(
           GetTimestampMetadata(static_cast<::arrow::TimestampType&>(*field->type()),
-                               arrow_properties, &type, &logical_type));
+                               properties, arrow_properties, &type, &annotation));
       break;
     case ArrowTypeId::TIME32:
       type = ParquetType::INT32;
-      logical_type = LogicalType::TIME_MILLIS;
+      annotation = LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MILLIS);
       break;
     case ArrowTypeId::TIME64: {
+      type = ParquetType::INT64;
       auto time_type = static_cast<::arrow::Time64Type*>(field->type().get());
       if (time_type->unit() == ::arrow::TimeUnit::NANO) {
-        return Status::NotImplemented("Nanosecond time not supported in Parquet.");
+        annotation = LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::NANOS);
+      } else {
+        annotation = LogicalAnnotation::Time(false, LogicalAnnotation::TimeUnit::MICROS);
       }
-      type = ParquetType::INT64;
-      logical_type = LogicalType::TIME_MICROS;
     } break;
     case ArrowTypeId::STRUCT: {
       auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
@@ -625,9 +739,10 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
           field->type()->ToString());
     }
   }
-  PARQUET_CATCH_NOT_OK(*out =
-                           PrimitiveNode::Make(field->name(), repetition, type,
-                                               logical_type, length, precision, scale));
+
+  PARQUET_CATCH_NOT_OK(*out = PrimitiveNode::Make(field->name(), repetition, annotation,
+                                                  type, length));
+
   return Status::OK();
 }
 
@@ -725,7 +840,7 @@ int32_t DecimalSize(int32_t precision) {
   }
   DCHECK(false);
   return -1;
-}  // namespace arrow
+}
 
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 96db68b..9181120 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -383,9 +383,9 @@ class ArrowColumnWriter {
   Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
                          const int16_t* rep_levels);
 
-  Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const Array& data,
-                               int64_t num_levels, const int16_t* def_levels,
-                               const int16_t* rep_levels);
+  Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
+                               const int16_t* def_levels, const int16_t* rep_levels,
+                               const ArrowWriterProperties& properties);
 
   template <typename ParquetType, typename ArrowType>
   Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
@@ -650,50 +650,92 @@ Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampTyp
 Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
                                           const int16_t* def_levels,
                                           const int16_t* rep_levels) {
-  const auto& type = static_cast<const ::arrow::TimestampType&>(*values.type());
-
-  const bool is_nanosecond = type.unit() == TimeUnit::NANO;
+  const auto& source_type = static_cast<const ::arrow::TimestampType&>(*values.type());
 
   if (ctx_->properties->support_deprecated_int96_timestamps()) {
-    // The user explicitly required to use Int96 storage.
+    // User explicitly requested Int96 timestamps
     return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
                                                               def_levels, rep_levels);
-  } else if (is_nanosecond ||
-             (ctx_->properties->coerce_timestamps_enabled() &&
-              (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
-    // Casting is required. This covers several cases
-    // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved)
-    // * coerce_timestamps_enabled_, cast all timestamps to requested unit
-    return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
-                                 num_levels, def_levels, rep_levels);
+  } else if (ctx_->properties->coerce_timestamps_enabled()) {
+    // User explicitly requested coercion to specific unit
+    if (source_type.unit() == ctx_->properties->coerce_timestamps_unit()) {
+      // No data conversion necessary
+      return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
+                                                                def_levels, rep_levels);
+    } else {
+      return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels,
+                                   *(ctx_->properties));
+    }
+  } else if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0 &&
+             source_type.unit() == TimeUnit::NANO) {
+    // Absent superseding user instructions, when writing Parquet version 1.0 files,
+    // timestamps in nanoseconds are coerced to microseconds
+    std::shared_ptr<ArrowWriterProperties> properties =
+        (ArrowWriterProperties::Builder())
+            .coerce_timestamps(TimeUnit::MICRO)
+            ->disallow_truncated_timestamps()
+            ->build();
+    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
+  } else if (source_type.unit() == TimeUnit::SECOND) {
+    // Absent superseding user instructions, timestamps in seconds are coerced to
+    // milliseconds
+    std::shared_ptr<ArrowWriterProperties> properties =
+        (ArrowWriterProperties::Builder()).coerce_timestamps(TimeUnit::MILLI)->build();
+    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels, *properties);
   } else {
-    // No casting of timestamps is required, take the fast path
+    // No data conversion necessary
     return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
                                                               def_levels, rep_levels);
   }
 }
 
-Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_allowed,
-                                                const Array& array, int64_t num_levels,
+#define COERCE_DIVIDE -1
+#define COERCE_INVALID 0
+#define COERCE_MULTIPLY +1
+
+static std::pair<int, int64_t> kTimestampCoercionFactors[4][4] = {
+    // from seconds ...
+    {{COERCE_INVALID, 0},                      // ... to seconds
+     {COERCE_MULTIPLY, 1000},                  // ... to millis
+     {COERCE_MULTIPLY, 1000000},               // ... to micros
+     {COERCE_MULTIPLY, INT64_C(1000000000)}},  // ... to nanos
+    // from millis ...
+    {{COERCE_INVALID, 0},
+     {COERCE_MULTIPLY, 1},
+     {COERCE_MULTIPLY, 1000},
+     {COERCE_MULTIPLY, 1000000}},
+    // from micros ...
+    {{COERCE_INVALID, 0},
+     {COERCE_DIVIDE, 1000},
+     {COERCE_MULTIPLY, 1},
+     {COERCE_MULTIPLY, 1000}},
+    // from nanos ...
+    {{COERCE_INVALID, 0},
+     {COERCE_DIVIDE, 1000000},
+     {COERCE_DIVIDE, 1000},
+     {COERCE_MULTIPLY, 1}}};
+
+Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
                                                 const int16_t* def_levels,
-                                                const int16_t* rep_levels) {
+                                                const int16_t* rep_levels,
+                                                const ArrowWriterProperties& properties) {
   int64_t* buffer;
   RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
 
   const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
-
   auto values = data.raw_values();
-  const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
 
-  TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
-                                   ? ctx_->properties->coerce_timestamps_unit()
-                                   : TimeUnit::MICRO;
+  const auto& source_type = static_cast<const ::arrow::TimestampType&>(*array.type());
+  auto source_unit = source_type.unit();
+
+  TimeUnit::type target_unit = properties.coerce_timestamps_unit();
   auto target_type = ::arrow::timestamp(target_unit);
+  bool truncation_allowed = properties.truncated_timestamps_allowed();
 
   auto DivideBy = [&](const int64_t factor) {
     for (int64_t i = 0; i < array.length(); i++) {
-      if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % factor != 0)) {
-        return Status::Invalid("Casting from ", type.ToString(), " to ",
+      if (!truncation_allowed && !data.IsNull(i) && (values[i] % factor != 0)) {
+        return Status::Invalid("Casting from ", source_type.ToString(), " to ",
                                target_type->ToString(), " would lose data: ", values[i]);
       }
       buffer[i] = values[i] / factor;
@@ -708,22 +750,12 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_
     return Status::OK();
   };
 
-  if (type.unit() == TimeUnit::NANO) {
-    if (target_unit == TimeUnit::MICRO) {
-      RETURN_NOT_OK(DivideBy(1000));
-    } else {
-      DCHECK_EQ(TimeUnit::MILLI, target_unit);
-      RETURN_NOT_OK(DivideBy(1000000));
-    }
-  } else if (type.unit() == TimeUnit::SECOND) {
-    RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
-  } else if (type.unit() == TimeUnit::MILLI) {
-    DCHECK_EQ(TimeUnit::MICRO, target_unit);
-    RETURN_NOT_OK(MultiplyBy(1000));
-  } else {
-    DCHECK_EQ(TimeUnit::MILLI, target_unit);
-    RETURN_NOT_OK(DivideBy(1000));
-  }
+  const auto& coercion = kTimestampCoercionFactors[static_cast<int>(source_unit)]
+                                                  [static_cast<int>(target_unit)];
+  // .first -> coercion operation; .second -> scale factor
+  DCHECK_NE(coercion.first, COERCE_INVALID);
+  RETURN_NOT_OK(coercion.first == COERCE_DIVIDE ? DivideBy(coercion.second)
+                                                : MultiplyBy(coercion.second));
 
   if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
     // no nulls, just dump the data
@@ -736,9 +768,14 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_
         static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
         num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
   }
+
   return Status::OK();
 }
 
+#undef COERCE_DIVIDE
+#undef COERCE_INVALID
+#undef COERCE_MULTIPLY
+
 // This specialization seems quite similar but it significantly differs in two points:
 // * offset is added at the most latest time to the pointer as we have sub-byte access
 // * Arrow data is stored bitwise thus we cannot use std::copy to transform from
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index db48b24..ee81af3 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -498,11 +498,13 @@ std::shared_ptr<const LogicalAnnotation> LogicalAnnotation::Date() {
 
 std::shared_ptr<const LogicalAnnotation> LogicalAnnotation::Time(
     bool is_adjusted_to_utc, LogicalAnnotation::TimeUnit::unit time_unit) {
+  DCHECK(time_unit != LogicalAnnotation::TimeUnit::UNKNOWN);
   return TimeAnnotation::Make(is_adjusted_to_utc, time_unit);
 }
 
 std::shared_ptr<const LogicalAnnotation> LogicalAnnotation::Timestamp(
     bool is_adjusted_to_utc, LogicalAnnotation::TimeUnit::unit time_unit) {
+  DCHECK(time_unit != LogicalAnnotation::TimeUnit::UNKNOWN);
   return TimestampAnnotation::Make(is_adjusted_to_utc, time_unit);
 }
 
@@ -512,6 +514,7 @@ std::shared_ptr<const LogicalAnnotation> LogicalAnnotation::Interval() {
 
 std::shared_ptr<const LogicalAnnotation> LogicalAnnotation::Int(int bit_width,
                                                                 bool is_signed) {
+  DCHECK(bit_width == 64 || bit_width == 32 || bit_width == 16 || bit_width == 8);
   return IntAnnotation::Make(bit_width, is_signed);
 }
 
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index e99dea0..49a03d6 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -190,9 +190,6 @@ def test_chunked_table_write():
     # ARROW-232
     df = alltypes_sample(size=10)
 
-    # The nanosecond->ms conversion is a nuisance, so we just avoid it here
-    del df['datetime']
-
     batch = pa.RecordBatch.from_pandas(df)
     table = pa.Table.from_batches([batch] * 3)
     _check_roundtrip(table, version='2.0')
@@ -206,8 +203,6 @@ def test_chunked_table_write():
 @pytest.mark.pandas
 def test_no_memory_map(tempdir):
     df = alltypes_sample(size=10)
-    # The nanosecond->us conversion is a nuisance, so we just avoid it here
-    del df['datetime']
 
     table = pa.Table.from_pandas(df)
     _check_roundtrip(table, read_table_kwargs={'memory_map': False},
@@ -234,8 +229,6 @@ def test_special_chars_filename(tempdir):
 @pytest.mark.pandas
 def test_empty_table_roundtrip():
     df = alltypes_sample(size=10)
-    # The nanosecond->us conversion is a nuisance, so we just avoid it here
-    del df['datetime']
 
     # Create a non-empty table to infer the types correctly, then slice to 0
     table = pa.Table.from_pandas(df)
@@ -941,7 +934,7 @@ def test_column_of_lists(tempdir):
 
 
 @pytest.mark.pandas
-def test_date_time_types():
+def test_date_time_types(tempdir):
     t1 = pa.date32()
     data1 = np.array([17259, 17260, 17261], dtype='int32')
     a1 = pa.array(data1, type=t1)
@@ -974,12 +967,6 @@ def test_date_time_types():
                      dtype='int64')
     a7 = pa.array(data7, type=t7)
 
-    t7_us = pa.timestamp('us')
-    start = pd.Timestamp('2001-01-01').value
-    data7_us = np.array([start, start + 1000, start + 2000],
-                        dtype='int64') // 1000
-    a7_us = pa.array(data7_us, type=t7_us)
-
     table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7],
                                  ['date32', 'date64', 'timestamp[us]',
                                   'time32[s]', 'time64[us]',
@@ -988,8 +975,7 @@ def test_date_time_types():
 
     # date64 as date32
     # time32[s] to time32[ms]
-    # 'timestamp[ns]' to 'timestamp[us]'
-    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7_us],
+    expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
                                     ['date32', 'date64', 'timestamp[us]',
                                      'time32[s]', 'time64[us]',
                                      'time32_from64[s]',
@@ -997,35 +983,62 @@ def test_date_time_types():
 
     _check_roundtrip(table, expected=expected, version='2.0')
 
-    # date64 as date32
-    # time32[s] to time32[ms]
-    # 'timestamp[ms]' is saved as INT96 timestamp
-    # 'timestamp[ns]' is saved as INT96 timestamp
-    expected = pa.Table.from_arrays([a1, a1, a7, a4, a5, ex_a6, a7],
-                                    ['date32', 'date64', 'timestamp[us]',
-                                     'time32[s]', 'time64[us]',
-                                     'time32_from64[s]',
-                                     'timestamp[ns]'])
-
-    _check_roundtrip(table, expected=expected, version='2.0',
-                     use_deprecated_int96_timestamps=True)
-
-    # Check that setting flavor to 'spark' uses int96 timestamps
-    _check_roundtrip(table, expected=expected, version='2.0',
-                     flavor='spark')
-
-    # Unsupported stuff
-    def _assert_unsupported(array):
-        table = pa.Table.from_arrays([array], ['unsupported'])
-        buf = io.BytesIO()
+    t0 = pa.timestamp('ms')
+    data0 = np.arange(4, dtype='int64')
+    a0 = pa.array(data0, type=t0)
 
-        with pytest.raises(NotImplementedError):
-            _write_table(table, buf, version="2.0")
+    t1 = pa.timestamp('us')
+    data1 = np.arange(4, dtype='int64')
+    a1 = pa.array(data1, type=t1)
 
-    t7 = pa.time64('ns')
-    a7 = pa.array(data4.astype('int64'), type=t7)
+    t2 = pa.timestamp('ns')
+    data2 = np.arange(4, dtype='int64')
+    a2 = pa.array(data2, type=t2)
 
-    _assert_unsupported(a7)
+    table = pa.Table.from_arrays([a0, a1, a2],
+                                 ['ts[ms]', 'ts[us]', 'ts[ns]'])
+    expected = pa.Table.from_arrays([a0, a1, a2],
+                                    ['ts[ms]', 'ts[us]', 'ts[ns]'])
+
+    # int64 for all timestamps supported by default
+    filename = tempdir / 'int64_timestamps.parquet'
+    _write_table(table, filename, version='2.0')
+    parquet_schema = pq.ParquetFile(filename).schema
+    for i in range(3):
+        assert parquet_schema.column(i).physical_type == 'INT64'
+    read_table = _read_table(filename)
+    assert read_table.equals(expected)
+
+    t0_ns = pa.timestamp('ns')
+    data0_ns = np.array(data0 * 1000000, dtype='int64')
+    a0_ns = pa.array(data0_ns, type=t0_ns)
+
+    t1_ns = pa.timestamp('ns')
+    data1_ns = np.array(data1 * 1000, dtype='int64')
+    a1_ns = pa.array(data1_ns, type=t1_ns)
+
+    expected = pa.Table.from_arrays([a0_ns, a1_ns, a2],
+                                    ['ts[ms]', 'ts[us]', 'ts[ns]'])
+
+    # int96 nanosecond timestamps produced upon request
+    filename = tempdir / 'explicit_int96_timestamps.parquet'
+    _write_table(table, filename, version='2.0',
+                 use_deprecated_int96_timestamps=True)
+    parquet_schema = pq.ParquetFile(filename).schema
+    for i in range(3):
+        assert parquet_schema.column(i).physical_type == 'INT96'
+    read_table = _read_table(filename)
+    assert read_table.equals(expected)
+
+    # int96 nanosecond timestamps implied by flavor 'spark'
+    filename = tempdir / 'spark_int96_timestamps.parquet'
+    _write_table(table, filename, version='2.0',
+                 flavor='spark')
+    parquet_schema = pq.ParquetFile(filename).schema
+    for i in range(3):
+        assert parquet_schema.column(i).physical_type == 'INT96'
+    read_table = _read_table(filename)
+    assert read_table.equals(expected)
 
 
 @pytest.mark.pandas
@@ -1037,6 +1050,64 @@ def test_list_of_datetime_time_roundtrip():
     _roundtrip_pandas_dataframe(df, write_kwargs={})
 
 
+@pytest.mark.pandas
+def test_parquet_version_timestamp_differences():
+    i_s = pd.Timestamp('2010-01-01').value / 1000000000  # := 1262304000
+
+    d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
+    d_ms = d_s * 1000
+    d_us = d_ms * 1000
+    d_ns = d_us * 1000
+
+    a_s = pa.array(d_s, type=pa.timestamp('s'))
+    a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
+    a_us = pa.array(d_us, type=pa.timestamp('us'))
+    a_ns = pa.array(d_ns, type=pa.timestamp('ns'))
+
+    names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns']
+    table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)
+
+    # Using Parquet version 1.0, seconds should be coerced to milliseconds
+    # and nanoseconds should be coerced to microseconds by default
+    expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names)
+    _check_roundtrip(table, expected)
+
+    # Using Parquet version 2.0, seconds should be coerced to milliseconds
+    # and nanoseconds should be retained by default
+    expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names)
+    _check_roundtrip(table, expected, version='2.0')
+
+    # Using Parquet version 1.0, coercing to milliseconds or microseconds
+    # is allowed
+    expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names)
+    _check_roundtrip(table, expected, coerce_timestamps='ms')
+
+    # Using Parquet version 2.0, coercing to milliseconds or microseconds
+    # is allowed
+    expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names)
+    _check_roundtrip(table, expected, version='2.0', coerce_timestamps='us')
+
+    # TODO: after pyarrow allows coerce_timestamps='ns', tests like the
+    # following should pass ...
+
+    # Using Parquet version 1.0, coercing to nanoseconds is not allowed
+    # expected = None
+    # with pytest.raises(NotImplementedError):
+    #     _roundtrip_table(table, coerce_timestamps='ns')
+
+    # Using Parquet version 2.0, coercing to nanoseconds is allowed
+    # expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
+    # _check_roundtrip(table, expected, version='2.0', coerce_timestamps='ns')
+
+    # For either Parquet version, coercing to nanoseconds is allowed
+    # if Int96 storage is used
+    expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
+    _check_roundtrip(table, expected,
+                     use_deprecated_int96_timestamps=True)
+    _check_roundtrip(table, expected, version='2.0',
+                     use_deprecated_int96_timestamps=True)
+
+
 def test_large_list_records():
     # This was fixed in PARQUET-1100
 
@@ -2080,6 +2151,33 @@ def test_write_error_deletes_incomplete_file(tempdir):
     assert not filename.exists()
 
 
+@pytest.mark.pandas
+def test_noncoerced_nanoseconds_written_without_exception(tempdir):
+    # ARROW-1957: the Parquet version 2.0 writer preserves Arrow
+    # nanosecond timestamps by default
+    n = 9
+    df = pd.DataFrame({'x': range(n)},
+                      index=pd.DatetimeIndex(start='2017-01-01',
+                      freq='1n',
+                      periods=n))
+    tb = pa.Table.from_pandas(df)
+
+    filename = tempdir / 'written.parquet'
+    try:
+        pq.write_table(tb, filename, version='2.0')
+    except Exception:
+        pass
+    assert filename.exists()
+
+    recovered_table = pq.read_table(filename)
+    assert tb.equals(recovered_table)
+
+    # Loss of data thru coercion (without explicit override) still an error
+    filename = tempdir / 'not_written.parquet'
+    with pytest.raises(ValueError):
+        pq.write_table(tb, filename, coerce_timestamps='ms', version='2.0')
+
+
 def test_read_non_existent_file(tempdir):
     path = 'non-existent-file.parquet'
     try: