You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2017/07/16 21:43:48 UTC

parquet-cpp git commit: PARQUET-1035: Write Int96 from Arrow timestamp(ns)

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 178ef72a4 -> e998dfb40


PARQUET-1035: Write Int96 from Arrow timestamp(ns)

Closes #356

cc @c-nichols

Author: Colin Nichols <ni...@gmail.com>
Author: Uwe L. Korn <uw...@apache.org>

Closes #371 from xhochy/PARQUET-1035 and squashes the following commits:

20e28e3 [Uwe L. Korn] Make support for INT96 optional
dd197fc [Colin Nichols] Move timestamp conversion function to header file
dfc9a00 [Colin Nichols] PARQUET-1035 Write Int96 from Arrow timestamp(ns)


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/e998dfb4
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/e998dfb4
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/e998dfb4

Branch: refs/heads/master
Commit: e998dfb40403197dbb8efa53d077449c81552d26
Parents: 178ef72
Author: Colin Nichols <ni...@gmail.com>
Authored: Sun Jul 16 17:43:43 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Jul 16 17:43:43 2017 -0400

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc |  63 +++++++--
 src/parquet/arrow/schema.cc                   |  41 ++++--
 src/parquet/arrow/schema.h                    |   6 +-
 src/parquet/arrow/writer.cc                   | 154 +++++++++++++++++++--
 src/parquet/arrow/writer.h                    |  84 ++++++++++-
 5 files changed, 306 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 746ce14..4424ea6 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -290,19 +290,23 @@ template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
 void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
-    int64_t row_group_size, std::shared_ptr<Buffer>* out) {
+    int64_t row_group_size,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+    std::shared_ptr<Buffer>* out) {
   auto sink = std::make_shared<InMemoryOutputStream>();
 
   ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
-      row_group_size, default_writer_properties()));
+      row_group_size, default_writer_properties(), arrow_properties));
   *out = sink->GetBuffer();
 }
 
 void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
     int64_t row_group_size, const std::vector<int>& column_subset,
-    std::shared_ptr<Table>* out) {
+    std::shared_ptr<Table>* out,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+        default_arrow_writer_properties()) {
   std::shared_ptr<Buffer> buffer;
-  WriteTableToBuffer(table, num_threads, row_group_size, &buffer);
+  WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer);
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(
@@ -919,7 +923,7 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
   this->CheckSingleColumnRequiredTableRead(4);
 }
 
-void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
+void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros = false) {
   using ::arrow::ArrayFromVector;
 
   std::vector<bool> is_valid = {true, true, true, false, true, true};
@@ -928,26 +932,41 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out) {
   auto f0 = field("f0", ::arrow::date32());
   auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
   auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
-  auto f3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
-  auto f4 = field("f4", ::arrow::time64(TimeUnit::MICRO));
-  std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4}));
+  std::shared_ptr<::arrow::Field> f3;
+  if (nanos_as_micros) {
+    f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
+  } else {
+    f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
+  }
+  auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
+  auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
+  std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));
 
   std::vector<int32_t> t32_values = {
       1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
   std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
       1489272000000, 1489272000000, 1489273000000};
+  std::vector<int64_t> t64_us_values = {
+      1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000};
 
-  std::shared_ptr<Array> a0, a1, a2, a3, a4;
+  std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
   ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
   ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
   ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
-  ArrayFromVector<::arrow::Time32Type, int32_t>(f3->type(), is_valid, t32_values, &a3);
-  ArrayFromVector<::arrow::Time64Type, int64_t>(f4->type(), is_valid, t64_values, &a4);
+  if (nanos_as_micros) {
+    ArrayFromVector<::arrow::TimestampType, int64_t>(
+        f3->type(), is_valid, t64_us_values, &a3);
+  } else {
+    ArrayFromVector<::arrow::TimestampType, int64_t>(
+        f3->type(), is_valid, t64_values, &a3);
+  }
+  ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
+  ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5);
 
   std::vector<std::shared_ptr<::arrow::Column>> columns = {
       std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
       std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
-      std::make_shared<Column>("f4", a4)};
+      std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};
   *out = std::make_shared<::arrow::Table>(schema, columns);
 }
 
@@ -955,8 +974,17 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
   std::shared_ptr<Table> table;
   MakeDateTimeTypesTable(&table);
 
+  // Use deprecated INT96 type
   std::shared_ptr<Table> result;
+  DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result,
+      ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build());
+
+  ASSERT_TRUE(table->Equals(*result));
+
+  // Cast nanaoseconds to microseconds and use INT64 physical type
   DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+  std::shared_ptr<Table> expected;
+  MakeDateTimeTypesTable(&table, true);
 
   ASSERT_TRUE(table->Equals(*result));
 }
@@ -1050,7 +1078,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {
   MakeDoubleTable(num_columns, num_rows, 1, &table);
 
   std::shared_ptr<Buffer> buffer;
-  WriteTableToBuffer(table, 1, num_rows / 2, &buffer);
+  WriteTableToBuffer(table, 1, num_rows / 2, default_arrow_writer_properties(), &buffer);
 
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(
@@ -1449,6 +1477,15 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
 INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
     ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
 
+TEST(TestImpalaConversion, NanosecondToImpala) {
+  // June 20, 2017 16:32:56 and 123456789 nanoseconds
+  int64_t nanoseconds = INT64_C(1497976376123456789);
+  Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
+  Int96 calculated;
+  internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated);
+  ASSERT_EQ(expected, calculated);
+}
+
 TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
   // PARQUET-995
   const char* data_dir = std::getenv("PARQUET_TEST_DATA");

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 2a4ddcd..d14ee4f 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -381,11 +381,13 @@ Status FromParquetSchema(
 }
 
 Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name,
-    bool nullable, const WriterProperties& properties, NodePtr* out) {
+    bool nullable, bool support_int96_nanoseconds, const WriterProperties& properties,
+    NodePtr* out) {
   Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
 
   NodePtr element;
-  RETURN_NOT_OK(FieldToNode(type->value_field(), properties, &element));
+  RETURN_NOT_OK(
+      FieldToNode(type->value_field(), properties, &element, support_int96_nanoseconds));
 
   NodePtr list = GroupNode::Make("list", Repetition::REPEATED, {element});
   *out = GroupNode::Make(name, repetition, {list}, LogicalType::LIST);
@@ -393,13 +395,14 @@ Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::str
 }
 
 Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
-    const std::string& name, bool nullable, const WriterProperties& properties,
-    NodePtr* out) {
+    const std::string& name, bool nullable, bool support_int96_nanoseconds,
+    const WriterProperties& properties, NodePtr* out) {
   Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;
 
   std::vector<NodePtr> children(type->num_children());
   for (int i = 0; i < type->num_children(); i++) {
-    RETURN_NOT_OK(FieldToNode(type->child(i), properties, &children[i]));
+    RETURN_NOT_OK(
+        FieldToNode(type->child(i), properties, &children[i], support_int96_nanoseconds));
   }
 
   *out = GroupNode::Make(name, repetition, children);
@@ -407,7 +410,7 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
 }
 
 Status FieldToNode(const std::shared_ptr<Field>& field,
-    const WriterProperties& properties, NodePtr* out) {
+    const WriterProperties& properties, NodePtr* out, bool support_int96_nanoseconds) {
   LogicalType::type logical_type = LogicalType::NONE;
   ParquetType::type type;
   Repetition::type repetition =
@@ -486,14 +489,24 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
     case ArrowType::TIMESTAMP: {
       auto timestamp_type = static_cast<::arrow::TimestampType*>(field->type().get());
       auto unit = timestamp_type->unit();
-      type = ParquetType::INT64;
       if (unit == ::arrow::TimeUnit::MILLI) {
+        type = ParquetType::INT64;
         logical_type = LogicalType::TIMESTAMP_MILLIS;
       } else if (unit == ::arrow::TimeUnit::MICRO) {
+        type = ParquetType::INT64;
         logical_type = LogicalType::TIMESTAMP_MICROS;
+      } else if (unit == ::arrow::TimeUnit::NANO) {
+        if (support_int96_nanoseconds) {
+          type = ParquetType::INT96;
+          // No corresponding logical type
+        } else {
+          type = ParquetType::INT64;
+          logical_type = LogicalType::TIMESTAMP_MICROS;
+        }
       } else {
         return Status::NotImplemented(
-            "Only MILLI and MICRO units supported for Arrow timestamps with Parquet.");
+            "Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
+            "Parquet.");
       }
     } break;
     case ArrowType::TIME32:
@@ -510,11 +523,13 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
     } break;
     case ArrowType::STRUCT: {
       auto struct_type = std::static_pointer_cast<::arrow::StructType>(field->type());
-      return StructToNode(struct_type, field->name(), field->nullable(), properties, out);
+      return StructToNode(struct_type, field->name(), field->nullable(),
+          support_int96_nanoseconds, properties, out);
     } break;
     case ArrowType::LIST: {
       auto list_type = std::static_pointer_cast<::arrow::ListType>(field->type());
-      return ListToNode(list_type, field->name(), field->nullable(), properties, out);
+      return ListToNode(list_type, field->name(), field->nullable(),
+          support_int96_nanoseconds, properties, out);
     } break;
     default:
       // TODO: LIST, DENSE_UNION, SPARE_UNION, JSON_SCALAR, DECIMAL, DECIMAL_TEXT, VARCHAR
@@ -525,10 +540,12 @@ Status FieldToNode(const std::shared_ptr<Field>& field,
 }
 
 Status ToParquetSchema(const ::arrow::Schema* arrow_schema,
-    const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out) {
+    const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out,
+    bool support_int96_nanoseconds) {
   std::vector<NodePtr> nodes(arrow_schema->num_fields());
   for (int i = 0; i < arrow_schema->num_fields(); i++) {
-    RETURN_NOT_OK(FieldToNode(arrow_schema->field(i), properties, &nodes[i]));
+    RETURN_NOT_OK(FieldToNode(
+        arrow_schema->field(i), properties, &nodes[i], support_int96_nanoseconds));
   }
 
   NodePtr schema = GroupNode::Make("schema", Repetition::REQUIRED, nodes);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 69ca661..d4f5ea3 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -66,10 +66,12 @@ namespace arrow {
     const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
 
 ::arrow::Status PARQUET_EXPORT FieldToNode(const std::shared_ptr<::arrow::Field>& field,
-    const WriterProperties& properties, schema::NodePtr* out);
+    const WriterProperties& properties, schema::NodePtr* out,
+    bool support_int96_nanoseconds = false);
 
 ::arrow::Status PARQUET_EXPORT ToParquetSchema(const ::arrow::Schema* arrow_schema,
-    const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out);
+    const WriterProperties& properties, std::shared_ptr<SchemaDescriptor>* out,
+    bool support_int96_nanoseconds = false);
 
 }  // namespace arrow
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 104c040..c562b27 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -36,6 +36,7 @@ using arrow::Int16Array;
 using arrow::Int16Builder;
 using arrow::Field;
 using arrow::MemoryPool;
+using arrow::NumericArray;
 using arrow::PoolBuffer;
 using arrow::PrimitiveArray;
 using arrow::ListArray;
@@ -52,6 +53,12 @@ namespace arrow {
 
 namespace BitUtil = ::arrow::BitUtil;
 
+std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
+  static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
+      ArrowWriterProperties::Builder().build();
+  return default_writer_properties;
+}
+
 class LevelBuilder {
  public:
   explicit LevelBuilder(MemoryPool* pool)
@@ -241,13 +248,18 @@ Status LevelBuilder::VisitInline(const Array& array) {
 
 class FileWriter::Impl {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+      const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
 
   Status NewRowGroup(int64_t chunk_size);
   template <typename ParquetType, typename ArrowType>
   Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr<Array>& data,
       int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
 
+  Status TypedWriteBatchConvertedNanos(ColumnWriter* writer,
+      const std::shared_ptr<Array>& data, int64_t num_levels, const int16_t* def_levels,
+      const int16_t* rep_levels);
+
   template <typename ParquetType, typename ArrowType>
   Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* writer,
       const ArrowType& type, int64_t num_values, int64_t num_levels,
@@ -274,13 +286,16 @@ class FileWriter::Impl {
   PoolBuffer data_buffer_;
   std::unique_ptr<ParquetFileWriter> writer_;
   RowGroupWriter* row_group_writer_;
+  std::shared_ptr<ArrowWriterProperties> arrow_properties_;
 };
 
-FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
     : pool_(pool),
       data_buffer_(pool),
       writer_(std::move(writer)),
-      row_group_writer_(nullptr) {}
+      row_group_writer_(nullptr),
+      arrow_properties_(arrow_properties) {}
 
 Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
   if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
@@ -361,6 +376,25 @@ Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
   return Status::OK();
 }
 
+template <>
+Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
+    TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const int64_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
+  auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+  if (type.unit() == TimeUnit::NANO) {
+    for (int i = 0; i < num_values; i++) {
+      internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+    }
+  } else {
+    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
+  }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  return Status::OK();
+}
+
 #define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)         \
   template <>                                                              \
   Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>(  \
@@ -455,6 +489,32 @@ Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
   return Status::OK();
 }
 
+template <>
+Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
+    TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+    const int64_t* data_ptr) {
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
+  auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+  INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
+
+  if (type.unit() == TimeUnit::NANO) {
+    for (int i = 0; i < num_values; i++) {
+      if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+        internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+      }
+      READ_NEXT_BITSET(valid_bits);
+    }
+  } else {
+    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
+  }
+  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+  return Status::OK();
+}
+
 #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
   template <>                                                                          \
   Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(                 \
@@ -475,6 +535,40 @@ NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
 NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
 NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 
+Status FileWriter::Impl::TypedWriteBatchConvertedNanos(ColumnWriter* column_writer,
+    const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
+  // Note that we can only use data_buffer_ here as we write timestamps with the fast
+  // path.
+  RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
+  int64_t* data_buffer_ptr = reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
+
+  auto data = static_cast<const NumericArray<::arrow::TimestampType>*>(array.get());
+  auto data_ptr = reinterpret_cast<const int64_t*>(data->values()->data());
+  auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
+
+  // Convert nanoseconds to microseconds
+  for (int64_t i = 0; i < array->length(); i++) {
+    data_buffer_ptr[i] = data_ptr[i] / 1000;
+  }
+
+  std::shared_ptr<::arrow::TimestampType> type =
+      std::static_pointer_cast<::arrow::TimestampType>(
+          ::arrow::timestamp(::arrow::TimeUnit::MICRO));
+  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+    // no nulls, just dump the data
+    RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(writer, *type,
+        array->length(), num_levels, def_levels, rep_levels, data_buffer_ptr)));
+  } else {
+    const uint8_t* valid_bits = data->null_bitmap_data();
+    RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(writer, *type,
+        array->length(), num_levels, def_levels, rep_levels, valid_bits, data->offset(),
+        data_buffer_ptr)));
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
 // This specialization seems quite similar but it significantly differs in two points:
 // * offset is added at the most latest time to the pointer as we have sub-byte access
 // * Arrow data is stored bitwise thus we cannot use std::copy to transform from
@@ -645,6 +739,21 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
       }
     }
       WRITE_BATCH_CASE(NA, NullType, Int32Type)
+    case ::arrow::Type::TIMESTAMP: {
+      auto timestamp_type =
+          static_cast<::arrow::TimestampType*>(values_array->type().get());
+      if (timestamp_type->unit() == ::arrow::TimeUnit::NANO &&
+          arrow_properties_->support_deprecated_int96_timestamps()) {
+        return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
+            column_writer, values_array, num_levels, def_levels, rep_levels);
+      } else if (timestamp_type->unit() == ::arrow::TimeUnit::NANO) {
+        return TypedWriteBatchConvertedNanos(
+            column_writer, values_array, num_levels, def_levels, rep_levels);
+      } else {
+        return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
+            column_writer, values_array, num_levels, def_levels, rep_levels);
+      }
+    }
       WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
       WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
       WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
@@ -660,7 +769,6 @@ Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
       WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
       WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
       WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
-      WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
       WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
       WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
     default:
@@ -688,22 +796,32 @@ MemoryPool* FileWriter::memory_pool() const {
 
 FileWriter::~FileWriter() {}
 
-FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
-    : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+    : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {}
 
 Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
     const std::shared_ptr<OutputStream>& sink,
     const std::shared_ptr<WriterProperties>& properties,
     std::unique_ptr<FileWriter>* writer) {
+  return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer);
+}
+
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+    const std::shared_ptr<OutputStream>& sink,
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+    std::unique_ptr<FileWriter>* writer) {
   std::shared_ptr<SchemaDescriptor> parquet_schema;
-  RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema));
+  RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema,
+      arrow_properties->support_deprecated_int96_timestamps()));
 
   auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
 
   std::unique_ptr<ParquetFileWriter> base_writer =
       ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());
 
-  writer->reset(new FileWriter(pool, std::move(base_writer)));
+  writer->reset(new FileWriter(pool, std::move(base_writer), arrow_properties));
   return Status::OK();
 }
 
@@ -715,6 +833,15 @@ Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool
   return Open(schema, pool, wrapper, properties, writer);
 }
 
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+    const std::shared_ptr<::arrow::io::OutputStream>& sink,
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+    std::unique_ptr<FileWriter>* writer) {
+  auto wrapper = std::make_shared<ArrowOutputStream>(sink);
+  return Open(schema, pool, wrapper, properties, arrow_properties, writer);
+}
+
 Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
   // TODO(ARROW-232) Support writing chunked arrays.
   for (int i = 0; i < table.num_columns(); i++) {
@@ -742,18 +869,21 @@ Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
 
 Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
     const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
-    const std::shared_ptr<WriterProperties>& properties) {
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
   std::unique_ptr<FileWriter> writer;
-  RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties, &writer));
+  RETURN_NOT_OK(FileWriter::Open(
+      *table.schema(), pool, sink, properties, arrow_properties, &writer));
   RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
   return writer->Close();
 }
 
 Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
     const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
-    const std::shared_ptr<WriterProperties>& properties) {
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
   auto wrapper = std::make_shared<ArrowOutputStream>(sink);
-  return WriteTable(table, pool, wrapper, chunk_size, properties);
+  return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties);
 }
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/e998dfb4/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 3916298..4f7d2b4 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -40,6 +40,43 @@ class Table;
 namespace parquet {
 namespace arrow {
 
+class PARQUET_EXPORT ArrowWriterProperties {
+ public:
+  class Builder {
+   public:
+    Builder() : write_nanos_as_int96_(false) {}
+    virtual ~Builder() {}
+
+    Builder* disable_deprecated_int96_timestamps() {
+      write_nanos_as_int96_ = false;
+      return this;
+    }
+
+    Builder* enable_deprecated_int96_timestamps() {
+      write_nanos_as_int96_ = true;
+      return this;
+    }
+
+    std::shared_ptr<ArrowWriterProperties> build() {
+      return std::shared_ptr<ArrowWriterProperties>(
+          new ArrowWriterProperties(write_nanos_as_int96_));
+    }
+
+   private:
+    bool write_nanos_as_int96_;
+  };
+
+  bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; }
+
+ private:
+  explicit ArrowWriterProperties(bool write_nanos_as_int96)
+      : write_nanos_as_int96_(write_nanos_as_int96) {}
+
+  const bool write_nanos_as_int96_;
+};
+
+std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();
+
 /**
  * Iterative API:
  *  Start a new RowGroup/Chunk with NewRowGroup
@@ -47,7 +84,9 @@ namespace arrow {
  */
 class PARQUET_EXPORT FileWriter {
  public:
-  FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+  FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+      const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+          default_arrow_writer_properties());
 
   static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
       const std::shared_ptr<OutputStream>& sink,
@@ -55,8 +94,20 @@ class PARQUET_EXPORT FileWriter {
       std::unique_ptr<FileWriter>* writer);
 
   static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+      const std::shared_ptr<OutputStream>& sink,
+      const std::shared_ptr<WriterProperties>& properties,
+      const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
+      std::unique_ptr<FileWriter>* writer);
+
+  static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+      const std::shared_ptr<::arrow::io::OutputStream>& sink,
+      const std::shared_ptr<WriterProperties>& properties,
+      std::unique_ptr<FileWriter>* writer);
+
+  static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
       const std::shared_ptr<::arrow::io::OutputStream>& sink,
       const std::shared_ptr<WriterProperties>& properties,
+      const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
       std::unique_ptr<FileWriter>* writer);
 
   /**
@@ -87,12 +138,39 @@ class PARQUET_EXPORT FileWriter {
 ::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
     ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
     int64_t chunk_size,
-    const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+    const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+        default_arrow_writer_properties());
 
 ::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
     ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
     int64_t chunk_size,
-    const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+    const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
+    const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+        default_arrow_writer_properties());
+
+namespace internal {
+
+/**
+ * Timestamp conversion constants
+ */
+constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
+constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000);
+
+/**
+ * Converts nanosecond timestamps to Impala (Int96) format
+ */
+inline void NanosecondsToImpalaTimestamp(
+    const int64_t nanoseconds, Int96* impala_timestamp) {
+  int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays;
+  (*impala_timestamp).value[2] = (uint32_t)julian_days;
+
+  int64_t last_day_nanos = nanoseconds % kNanosecondsPerDay;
+  int64_t* impala_last_day_nanos = reinterpret_cast<int64_t*>(impala_timestamp);
+  *impala_last_day_nanos = last_day_nanos;
+}
+
+}  // namespace internal
 
 }  // namespace arrow
 }  // namespace parquet