You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/20 21:18:20 UTC
parquet-cpp git commit: PARQUET-805: Read Int96 into Arrow
Timestamp(ns)
Repository: parquet-cpp
Updated Branches:
refs/heads/master a74c3016a -> 16466b109
PARQUET-805: Read Int96 into Arrow Timestamp(ns)
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #204 from xhochy/PARQUET-805 and squashes the following commits:
895dc30 [Uwe L. Korn] Add missing return type
a2f7f5b [Uwe L. Korn] Incorporate review
f2255a3 [Uwe L. Korn] PARQUET-805: Read Int96 into Arrow Timestamp(ns)
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/16466b10
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/16466b10
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/16466b10
Branch: refs/heads/master
Commit: 16466b109e87792786d0612fb9f94fad39d13d6c
Parents: a74c301
Author: Uwe L. Korn <uw...@xhochy.com>
Authored: Tue Dec 20 16:18:13 2016 -0500
Committer: Wes McKinney <we...@twosigma.com>
Committed: Tue Dec 20 16:18:13 2016 -0500
----------------------------------------------------------------------
src/parquet/arrow/arrow-reader-writer-test.cc | 52 ++++++++++++++++++++++
src/parquet/arrow/arrow-schema-test.cc | 11 +++--
src/parquet/arrow/reader.cc | 51 ++++++++++++++++++++-
src/parquet/arrow/schema.cc | 7 +--
4 files changed, 111 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 25ba457..a8a5db0 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -391,6 +391,58 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
this->ReadAndCheckSingleColumnTable(values);
}
+using TestInt96ParquetIO = TestParquetIO<::arrow::TimestampType>;
+
+TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
+ // This test explicitly tests the conversion from an Impala-style timestamp
+ // to a nanoseconds-since-epoch one.
+
+ // 2nd January 1970, 11:35min 145738543ns
+ Int96 day;
+ day.value[2] = 2440589l;
+ int64_t seconds = ((1 * 24 + 11) * 60 + 35) * 60;
+ *(reinterpret_cast<int64_t*>(&(day.value))) =
+ seconds * 1000l * 1000l * 1000l + 145738543;
+ // Compute the corresponding nanosecond timestamp
+ struct tm datetime = {0};
+ datetime.tm_year = 70;
+ datetime.tm_mon = 0;
+ datetime.tm_mday = 2;
+ datetime.tm_hour = 11;
+ datetime.tm_min = 35;
+ struct tm epoch = {0};
+ epoch.tm_year = 70;
+ epoch.tm_mday = 1;
+ // Nanoseconds since the epoch
+ int64_t val = lrint(difftime(mktime(&datetime), mktime(&epoch))) * 1000000000;
+ val += 145738543;
+
+ std::vector<std::shared_ptr<schema::Node>> fields(
+ {schema::PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96)});
+ std::shared_ptr<schema::GroupNode> schema = std::static_pointer_cast<GroupNode>(
+ schema::GroupNode::Make("schema", Repetition::REQUIRED, fields));
+
+ // We cannot write this column with Arrow, so we have to use the plain parquet-cpp API
+ // to write an Int96 file.
+ this->sink_ = std::make_shared<InMemoryOutputStream>();
+ auto writer = ParquetFileWriter::Open(this->sink_, schema);
+ RowGroupWriter* rg_writer = writer->AppendRowGroup(1);
+ ColumnWriter* c_writer = rg_writer->NextColumn();
+ auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer);
+ ASSERT_NE(typed_writer, nullptr);
+ typed_writer->WriteBatch(1, nullptr, nullptr, &day);
+ c_writer->Close();
+ rg_writer->Close();
+ writer->Close();
+
+ ::arrow::TimestampBuilder builder(
+ default_memory_pool(), ::arrow::timestamp(::arrow::TimeUnit::NANO));
+ builder.Append(val);
+ std::shared_ptr<Array> values;
+ ASSERT_OK(builder.Finish(&values));
+ this->ReadAndCheckSingleColumnFile(values.get());
+}
+
using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/arrow-schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 360680f..3d07561 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -47,6 +47,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
const auto UTF8 = std::make_shared<::arrow::StringType>();
const auto TIMESTAMP_MS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+const auto TIMESTAMP_NS =
+ std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
// TODO: This requires parquet-cpp implementing the MICROS enum value
// const auto TIMESTAMP_US = std::make_shared<TimestampType>(TimestampType::Unit::MICRO);
const auto BINARY =
@@ -98,9 +100,9 @@ TEST_F(TestConvertParquetSchema, ParquetFlatPrimitives) {
ParquetType::INT64, LogicalType::TIMESTAMP_MILLIS));
arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_MS, false));
- // parquet_fields.push_back(PrimitiveNode::Make("timestamp", Repetition::REQUIRED,
- // ParquetType::INT64, LogicalType::TIMESTAMP_MICROS));
- // arrow_fields.push_back(std::make_shared<Field>("timestamp", TIMESTAMP_US, false));
+ parquet_fields.push_back(
+ PrimitiveNode::Make("timestamp96", Repetition::REQUIRED, ParquetType::INT96));
+ arrow_fields.push_back(std::make_shared<Field>("timestamp96", TIMESTAMP_NS, false));
parquet_fields.push_back(
PrimitiveNode::Make("float", Repetition::OPTIONAL, ParquetType::FLOAT));
@@ -339,9 +341,6 @@ TEST_F(TestConvertParquetSchema, ParquetLists) {
TEST_F(TestConvertParquetSchema, UnsupportedThings) {
std::vector<NodePtr> unsupported_nodes;
- unsupported_nodes.push_back(
- PrimitiveNode::Make("int96", Repetition::REQUIRED, ParquetType::INT96));
-
unsupported_nodes.push_back(PrimitiveNode::Make(
"int32", Repetition::OPTIONAL, ParquetType::INT32, LogicalType::DATE));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index f2d4639..2efa806 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -18,6 +18,7 @@
#include "parquet/arrow/reader.h"
#include <algorithm>
+#include <chrono>
#include <queue>
#include <string>
#include <vector>
@@ -44,6 +45,15 @@ using ParquetReader = parquet::ParquetFileReader;
namespace parquet {
namespace arrow {
+constexpr int64_t kJulianToUnixEpochDays = 2440588L;
+constexpr int64_t kNanosecondsInADay = 86400L * 1000L * 1000L * 1000L;
+
+static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
+ int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
+ int64_t nanoseconds = *(reinterpret_cast<const int64_t*>(&(impala_timestamp.value)));
+ return days_since_epoch * kNanosecondsInADay + nanoseconds;
+}
+
template <typename ArrowType>
struct ArrowTypeTraits {
typedef ::arrow::NumericArray<ArrowType> array_type;
@@ -239,6 +249,15 @@ void FlatColumnReader::Impl::ReadNonNullableBatch(
}
template <>
+void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
+ Int96* values, int64_t values_read) {
+ int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+ for (int64_t i = 0; i < values_read; i++) {
+ out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]);
+ }
+}
+
+template <>
void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
bool* values, int64_t values_read) {
for (int64_t i = 0; i < values_read; i++) {
@@ -266,6 +285,22 @@ void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
}
template <>
+void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>(
+ const int16_t* def_levels, Int96* values, int64_t values_read, int64_t levels_read) {
+ auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
+ int values_idx = 0;
+ for (int64_t i = 0; i < levels_read; i++) {
+ if (def_levels[i] < descr_->max_definition_level()) {
+ null_count_++;
+ } else {
+ ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
+ data_ptr[valid_bits_idx_] = impala_timestamp_to_nanoseconds(values[values_idx++]);
+ }
+ valid_bits_idx_++;
+ }
+}
+
+template <>
void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) {
int values_idx = 0;
@@ -518,7 +553,21 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
- TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
+ case ::arrow::Type::TIMESTAMP: {
+ ::arrow::TimestampType* timestamp_type =
+ static_cast<::arrow::TimestampType*>(field_->type.get());
+ switch (timestamp_type->unit) {
+ case ::arrow::TimeUnit::MILLI:
+ return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
+ break;
+ case ::arrow::TimeUnit::NANO:
+ return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
+ break;
+ default:
+ return Status::NotImplemented("TimeUnit not supported");
+ }
+ break;
+ }
default:
return Status::NotImplemented(field_->type->ToString());
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/16466b10/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 2875dc6..e578ec2 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -58,6 +58,8 @@ const auto DOUBLE = std::make_shared<::arrow::DoubleType>();
const auto UTF8 = std::make_shared<::arrow::StringType>();
const auto TIMESTAMP_MS =
std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::MILLI);
+const auto TIMESTAMP_NS =
+ std::make_shared<::arrow::TimestampType>(::arrow::TimestampType::Unit::NANO);
const auto BINARY =
std::make_shared<::arrow::ListType>(std::make_shared<::arrow::Field>("", UINT8));
@@ -162,9 +164,8 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
RETURN_NOT_OK(FromInt64(primitive, out));
break;
case ParquetType::INT96:
- // TODO: Do we have that type in Arrow?
- // type = TypePtr(new Int96Type());
- return Status::NotImplemented("int96");
+ *out = TIMESTAMP_NS;
+ break;
case ParquetType::FLOAT:
*out = FLOAT;
break;