You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/20 21:18:20 UTC

parquet-cpp git commit: PARQUET-805: Read Int96 into Arrow Timestamp(ns)

Repository: parquet-cpp
Updated Branches:
  refs/heads/master a74c3016a -> 16466b109


PARQUET-805: Read Int96 into Arrow Timestamp(ns)

Author: Uwe L. Korn <uw...@xhochy.com>

Closes #204 from xhochy/PARQUET-805 and squashes the following commits:

895dc30 [Uwe L. Korn] Add missing return type
a2f7f5b [Uwe L. Korn] Incorporate review
f2255a3 [Uwe L. Korn] PARQUET-805: Read Int96 into Arrow Timestamp(ns)


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/16466b10
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/16466b10
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/16466b10

Branch: refs/heads/master
Commit: 16466b109e87792786d0612fb9f94fad39d13d6c
Parents: a74c301
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Tue Dec 20 16:18:13 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Dec 20 16:18:13 2016 -0500

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc | 52 ++++++++++++++++++++++
 src/parquet/arrow/arrow-schema-test.cc        | 11 +++--
 src/parquet/arrow/reader.cc                   | 51 ++++++++++++++++++++-
 src/parquet/arrow/schema.cc                   |  7 +--
 4 files changed, 111 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 25ba457..a8a5db0 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -391,6 +391,58 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
   this->ReadAndCheckSingleColumnTable(values);
 }
 
+using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>;
+
+TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
+  // This test explicitly tests the conversion from an Impala-style timestamp
+  // to a nanoseconds-since-epoch one.
+
+  // 2nd January 1970, 11:35min 145738543ns
+  Int96 day;
+  day.value[2] = 2440589l;
+  int64_t seconds = ((1 * 24 + 11) * 60 + 35) * 60;
+  *(reinterpret_cast<int64_t*>(&(day.value))) =
+      seconds * 1000l * 1000l * 1000l + 145738543;
+  // Compute the corresponding nanosecond timestamp
+  struct tm datetime = {0};
+  datetime.tm_year = 70;
+  datetime.tm_mon = 0;
+  datetime.tm_mday = 2;
+  datetime.tm_hour = 11;
+  datetime.tm_min = 35;
+  struct tm epoch = {0};
+  epoch.tm_year = 70;
+  epoch.tm_mday = 1;
+  // Nanoseconds since the epoch
+  int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000000;
+  val += 145738543;
+
+  std::vector<std::shared_ptr<schema::Node>> fields(
+      {schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)});
+  std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>(
+      schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+  // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API
+  // to write an Int96 file.
+  this->sink_ = std::make_shared<InMemoryOutputStream>();
+  auto writer = ParquetFileWriter::Open(this->sink_, schema);
+  RowGroupWriter* rg_writer = writer->AppendRowGroup(1);
+  ColumnWriter* c_writer = rg_writer->NextColumn();
+  auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer);
+  ASSERT_NE(typed_writer, nullptr);
+  typed_writer->WriteBatch(1, nullptr, nullptr, &day);
+  c_writer->Close();
+  rg_writer->Close();
+  writer->Close();
+
+  ::arrow::TimestampBuilder builder(
+      default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO));
+  builder.Append(val);
+  std::shared_ptr<Array> values;
+  ASSERT_OK(builder.Finish(&values));
+  this->ReadAndCheckSingleColumnFile(values.get());
+}
+
 using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
 
 TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 360680f..3d07561 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -47,6 +47,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
 const auto UTF8 = std::make_shared<::arrow::StringType>();
 const auto TIMESTAMP_MS =
     std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+const auto TIMESTAMP_NS =
+    std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
 // TODO: This requires parquet-cpp implementing the MICROS enum value
 // const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
 const auto BINARY =
@@ -98,9 +100,9 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
       ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
   arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
 
-  // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
-  //     ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
-  // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+  parquet_fields.push_back(
+      PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
+  arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
 
   parquet_fields.push_back(
       PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
@@ -339,9 +341,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
 TEST_F(TestConvertParquetSchema, UnsupportedThings) {
   std::vector<NodePtr> unsupported_nodes;
 
-  unsupported_nodes.push_back(
-      PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
-
   unsupported_nodes.push_back(PrimitiveNode::Make(
       "int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index f2d4639..2efa806 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -18,6 +18,7 @@
 #include "parquet/arrow/reader.h"
 
 #include <algorithm>
+#include <chrono>
 #include <queue>
 #include <string>
 #include <vector>
@@ -44,6 +45,15 @@ using ParquetReader = parquet::ParquetFileReader;
 namespace parquet {
 namespace arrow {
 
+constexpr int64_t kJulianToUnixEpochDays = 2440588L;
+constexpr int64_t kNanosecondsInADay = 86400L * 1000L * 1000L * 1000L;
+
+static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
+  int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
+  int64_t nanoseconds = *(reinterpret_cast<const int64_t*>(&(impala_timestamp.value)));
+  return days_since_epoch * kNanosecondsInADay + nanoseconds;
+}
+
 template <typename ArrowType>
 struct ArrowTypeTraits {
   typedef ::arrow::NumericArray<ArrowType> array_type;
@@ -239,6 +249,15 @@ void FlatColumnReader::Impl::ReadNonNullableBatch(
 }
 
 template <>
+void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
+    Int96* values, int64_t values_read) {
+  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+  for (int64_t i = 0; i < values_read; i++) {
+    out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]);
+  }
+}
+
+template <>
 void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
     bool* values, int64_t values_read) {
   for (int64_t i = 0; i < values_read; i++) {
@@ -266,6 +285,22 @@ void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
 }
 
 template <>
+void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>(
+    const int16_t* def_levels, Int96* values, int64_t values_read, int64_t levels_read) {
+  auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+  int values_idx = 0;
+  for (int64_t i = 0; i < levels_read; i++) {
+    if (def_levels[i] < descr_->max_definition_level()) {
+      null_count_++;
+    } else {
+      ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
+      data_ptr[valid_bits_idx_] = impala_timestamp_to_nanoseconds(values[values_idx++]);
+    }
+    valid_bits_idx_++;
+  }
+}
+
+template <>
 void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
     const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) {
   int values_idx = 0;
@@ -518,7 +553,21 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
     TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
     TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
     TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
-    TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+    case ::arrow::Type::TIMESTAMP: {
+      ::arrow::TimestampType* timestamp_type =
+          static_cast<::arrow::TimestampType*>(field_->type.get());
+      switch (timestamp_type->unit) {
+        case ::arrow::TimeUnit::MILLI:
+          return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
+          break;
+        case ::arrow::TimeUnit::NANO:
+          return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
+          break;
+        default:
+          return Status::NotImplemented("TimeUnit not supported");
+      }
+      break;
+    }
     default:
       return Status::NotImplemented(field_->type->ToString());
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 2875dc6..e578ec2 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -58,6 +58,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
 const auto UTF8 = std::make_shared<::arrow::StringType>();
 const auto TIMESTAMP_MS =
     std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+const auto TIMESTAMP_NS =
+    std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
 const auto BINARY =
     std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8));
 
@@ -162,9 +164,8 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
       RETURN_NOT_OK(FromInt64(primitive, out));
       break;
     case ParquetType::INT96:
-      // TODO: Do we have that type in Arrow?
-      // type = TypePtr(new Int96Type());
-      return Status::NotImplemented("int96");
+      *out = TIMESTAMP_NS;
+      break;
     case ParquetType::FLOAT:
       *out = FLOAT;
       break;