You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/12/15 22:00:34 UTC
[arrow] branch master updated: ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all time…
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ec154d2 ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all time…
ec154d2 is described below
commit ec154d232ed5585721e0ef12d61c1a6e2c06fdae
Author: François Saint-Jacques <fs...@gmail.com>
AuthorDate: Sat Dec 15 16:00:27 2018 -0600
ARROW-2026: [C++] Enforce use_deprecated_int96_timestamps to all time…
…stamps fields.
This changes the behavior of `use_deprecated_int96_timestamps` to support
all timestamp fields irregardless of the time unit. It would previously
only apply this conversion to fields with Nanosecond resolution.
People will only use this option when they use a system that only
supports INT96 timestamps, systems that also support INT64 timestamps in
other resolutions would not need the option.
A notable API change is that this option now take precedence over the
coerce_timestamps option.
Author: François Saint-Jacques <fs...@gmail.com>
Closes #3173 from fsaintjacques/ARROW-2026-parquet-int96-conversion and squashes the following commits:
2897a7278 <François Saint-Jacques> ARROW-2026: Enforce use_deprecated_int96_timestamps to all timestamps fields.
---
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 185 +++++++++++-----------
cpp/src/parquet/arrow/reader.cc | 16 +-
cpp/src/parquet/arrow/schema.cc | 73 ++++++---
cpp/src/parquet/arrow/writer.cc | 74 +++++----
cpp/src/parquet/arrow/writer.h | 62 ++++++--
cpp/src/parquet/types.h | 21 +++
python/pyarrow/parquet.py | 4 +-
python/pyarrow/tests/test_parquet.py | 5 +-
8 files changed, 256 insertions(+), 184 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 07124eb..4e62a22 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1193,65 +1193,116 @@ void MakeDateTimeTypesTable(std::shared_ptr<Table>* out, bool nanos_as_micros =
auto f0 = field("f0", ::arrow::date32());
auto f1 = field("f1", ::arrow::timestamp(TimeUnit::MILLI));
auto f2 = field("f2", ::arrow::timestamp(TimeUnit::MICRO));
- std::shared_ptr<::arrow::Field> f3;
- if (nanos_as_micros) {
- f3 = field("f3", ::arrow::timestamp(TimeUnit::MICRO));
- } else {
- f3 = field("f3", ::arrow::timestamp(TimeUnit::NANO));
- }
+ auto f3_unit = nanos_as_micros ? TimeUnit::MICRO : TimeUnit::NANO;
+ auto f3 = field("f3", ::arrow::timestamp(f3_unit));
auto f4 = field("f4", ::arrow::time32(TimeUnit::MILLI));
auto f5 = field("f5", ::arrow::time64(TimeUnit::MICRO));
+
std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1, f2, f3, f4, f5}));
std::vector<int32_t> t32_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};
- std::vector<int64_t> t64_values = {1489269000000, 1489270000000, 1489271000000,
- 1489272000000, 1489272000000, 1489273000000};
+ std::vector<int64_t> t64_ns_values = {1489269000000, 1489270000000, 1489271000000,
+ 1489272000000, 1489272000000, 1489273000000};
std::vector<int64_t> t64_us_values = {1489269000, 1489270000, 1489271000,
1489272000, 1489272000, 1489273000};
+ std::vector<int64_t> t64_ms_values = {1489269, 1489270, 1489271,
+ 1489272, 1489272, 1489273};
std::shared_ptr<Array> a0, a1, a2, a3, a4, a5;
ArrayFromVector<::arrow::Date32Type, int32_t>(f0->type(), is_valid, t32_values, &a0);
- ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_values, &a1);
- ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_values, &a2);
- if (nanos_as_micros) {
- ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_us_values,
- &a3);
- } else {
- ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, t64_values,
- &a3);
- }
+ ArrayFromVector<::arrow::TimestampType, int64_t>(f1->type(), is_valid, t64_ms_values,
+ &a1);
+ ArrayFromVector<::arrow::TimestampType, int64_t>(f2->type(), is_valid, t64_us_values,
+ &a2);
+ auto f3_data = nanos_as_micros ? t64_us_values : t64_ns_values;
+ ArrayFromVector<::arrow::TimestampType, int64_t>(f3->type(), is_valid, f3_data, &a3);
ArrayFromVector<::arrow::Time32Type, int32_t>(f4->type(), is_valid, t32_values, &a4);
- ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_values, &a5);
+ ArrayFromVector<::arrow::Time64Type, int64_t>(f5->type(), is_valid, t64_us_values, &a5);
std::vector<std::shared_ptr<::arrow::Column>> columns = {
std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
std::make_shared<Column>("f2", a2), std::make_shared<Column>("f3", a3),
std::make_shared<Column>("f4", a4), std::make_shared<Column>("f5", a5)};
+
*out = Table::Make(schema, columns);
}
TEST(TestArrowReadWrite, DateTimeTypes) {
- std::shared_ptr<Table> table;
+ std::shared_ptr<Table> table, result;
MakeDateTimeTypesTable(&table);
- // Use deprecated INT96 type
- std::shared_ptr<Table> result;
- ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
- table, false /* use_threads */, table->num_rows(), {}, &result,
- ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
-
- ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
-
// Cast nanaoseconds to microseconds and use INT64 physical type
ASSERT_NO_FATAL_FAILURE(
DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result));
- std::shared_ptr<Table> expected;
MakeDateTimeTypesTable(&table, true);
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*table, *result));
}
+TEST(TestArrowReadWrite, UseDeprecatedInt96) {
+ using ::arrow::ArrayFromVector;
+ using ::arrow::field;
+ using ::arrow::schema;
+
+ std::vector<bool> is_valid = {true, true, true, false, true, true};
+
+ auto t_s = ::arrow::timestamp(TimeUnit::SECOND);
+ auto t_ms = ::arrow::timestamp(TimeUnit::MILLI);
+ auto t_us = ::arrow::timestamp(TimeUnit::MICRO);
+ auto t_ns = ::arrow::timestamp(TimeUnit::NANO);
+
+ std::vector<int64_t> s_values = {1489269, 1489270, 1489271, 1489272, 1489272, 1489273};
+ std::vector<int64_t> ms_values = {1489269000, 1489270000, 1489271000,
+ 1489272001, 1489272000, 1489273000};
+ std::vector<int64_t> us_values = {1489269000000, 1489270000000, 1489271000000,
+ 1489272000001, 1489272000000, 1489273000000};
+ std::vector<int64_t> ns_values = {1489269000000000LL, 1489270000000000LL,
+ 1489271000000000LL, 1489272000000001LL,
+ 1489272000000000LL, 1489273000000000LL};
+
+ std::shared_ptr<Array> a_s, a_ms, a_us, a_ns;
+ ArrayFromVector<::arrow::TimestampType, int64_t>(t_s, is_valid, s_values, &a_s);
+ ArrayFromVector<::arrow::TimestampType, int64_t>(t_ms, is_valid, ms_values, &a_ms);
+ ArrayFromVector<::arrow::TimestampType, int64_t>(t_us, is_valid, us_values, &a_us);
+ ArrayFromVector<::arrow::TimestampType, int64_t>(t_ns, is_valid, ns_values, &a_ns);
+
+ // Each input is typed with a unique TimeUnit
+ auto input_schema = schema(
+ {field("f_s", t_s), field("f_ms", t_ms), field("f_us", t_us), field("f_ns", t_ns)});
+ auto input = Table::Make(
+ input_schema,
+ {std::make_shared<Column>("f_s", a_s), std::make_shared<Column>("f_ms", a_ms),
+ std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_ns)});
+
+ // When reading parquet files, all int96 schema fields are converted to
+ // timestamp nanoseconds
+ auto ex_schema = schema({field("f_s", t_ns), field("f_ms", t_ns), field("f_us", t_ns),
+ field("f_ns", t_ns)});
+ auto ex_result = Table::Make(
+ ex_schema,
+ {std::make_shared<Column>("f_s", a_ns), std::make_shared<Column>("f_ms", a_ns),
+ std::make_shared<Column>("f_us", a_ns), std::make_shared<Column>("f_ns", a_ns)});
+
+ std::shared_ptr<Table> result;
+ ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
+ input, false /* use_threads */, input->num_rows(), {}, &result,
+ ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build()));
+
+ ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
+
+ // Ensure enable_deprecated_int96_timestamps as precedence over
+ // coerce_timestamps.
+ ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(input, false /* use_threads */,
+ input->num_rows(), {}, &result,
+ ArrowWriterProperties::Builder()
+ .enable_deprecated_int96_timestamps()
+ ->coerce_timestamps(TimeUnit::MILLI)
+ ->build()));
+
+ ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result));
+}
+
TEST(TestArrowReadWrite, CoerceTimestamps) {
using ::arrow::ArrayFromVector;
using ::arrow::field;
@@ -1297,6 +1348,12 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
{std::make_shared<Column>("f_s", a_ms), std::make_shared<Column>("f_ms", a_ms),
std::make_shared<Column>("f_us", a_ms), std::make_shared<Column>("f_ns", a_ms)});
+ std::shared_ptr<Table> milli_result;
+ ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
+ input, false /* use_threads */, input->num_rows(), {}, &milli_result,
+ ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
+ ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));
+
// Result when coercing to microseconds
auto s3 = std::shared_ptr<::arrow::Schema>(
new ::arrow::Schema({field("f_s", t_us), field("f_ms", t_us), field("f_us", t_us),
@@ -1306,13 +1363,6 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
{std::make_shared<Column>("f_s", a_us), std::make_shared<Column>("f_ms", a_us),
std::make_shared<Column>("f_us", a_us), std::make_shared<Column>("f_ns", a_us)});
- std::shared_ptr<Table> milli_result;
- ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
- input, false /* use_threads */, input->num_rows(), {}, &milli_result,
- ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build()));
-
- ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_milli_result, *milli_result));
-
std::shared_ptr<Table> micro_result;
ASSERT_NO_FATAL_FAILURE(DoSimpleRoundtrip(
input, false /* use_threads */, input->num_rows(), {}, µ_result,
@@ -1457,65 +1507,6 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_table, *result));
}
-// Regression for ARROW-2802
-TEST(TestArrowReadWrite, CoerceTimestampsAndSupportDeprecatedInt96) {
- using ::arrow::Column;
- using ::arrow::default_memory_pool;
- using ::arrow::Field;
- using ::arrow::Schema;
- using ::arrow::Table;
- using ::arrow::TimestampBuilder;
- using ::arrow::TimestampType;
- using ::arrow::TimeUnit;
-
- auto timestamp_type = std::make_shared<TimestampType>(TimeUnit::NANO);
-
- TimestampBuilder builder(timestamp_type, default_memory_pool());
- for (std::int64_t ii = 0; ii < 10; ++ii) {
- ASSERT_OK(builder.Append(1000000000L * ii));
- }
- std::shared_ptr<Array> values;
- ASSERT_OK(builder.Finish(&values));
-
- std::vector<std::shared_ptr<Field>> fields;
- auto field = std::make_shared<Field>("nanos", timestamp_type);
- fields.emplace_back(field);
-
- auto schema = std::make_shared<Schema>(fields);
-
- std::vector<std::shared_ptr<Column>> columns;
- auto column = std::make_shared<Column>("nanos", values);
- columns.emplace_back(column);
-
- auto table = Table::Make(schema, columns);
-
- auto arrow_writer_properties = ArrowWriterProperties::Builder()
- .coerce_timestamps(TimeUnit::MICRO)
- ->enable_deprecated_int96_timestamps()
- ->build();
-
- std::shared_ptr<Table> result;
- DoSimpleRoundtrip(table, false /* use_threads */, table->num_rows(), {}, &result,
- arrow_writer_properties);
-
- ASSERT_EQ(table->num_columns(), result->num_columns());
- ASSERT_EQ(table->num_rows(), result->num_rows());
-
- auto actual_column = result->column(0);
- auto data = actual_column->data();
- auto expected_values =
- static_cast<::arrow::NumericArray<TimestampType>*>(values.get())->raw_values();
- for (int ii = 0; ii < data->num_chunks(); ++ii) {
- auto chunk =
- static_cast<::arrow::NumericArray<TimestampType>*>(data->chunk(ii).get());
- auto values = chunk->raw_values();
- for (int64_t jj = 0; jj < chunk->length(); ++jj, ++expected_values) {
- // Check that the nanos have been converted to micros
- ASSERT_EQ(*expected_values / 1000, values[jj]);
- }
- }
-}
-
void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Column> column;
@@ -2289,11 +2280,13 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead,
::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL));
-TEST(TestImpalaConversion, NanosecondToImpala) {
+TEST(TestImpalaConversion, ArrowTimestampToImpalaTimestamp) {
// June 20, 2017 16:32:56 and 123456789 nanoseconds
int64_t nanoseconds = INT64_C(1497976376123456789);
- Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
+
Int96 calculated;
+
+ Int96 expected = {{UINT32_C(632093973), UINT32_C(13871), UINT32_C(2457925)}};
internal::NanosecondsToImpalaTimestamp(nanoseconds, &calculated);
ASSERT_EQ(expected, calculated);
}
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 2a7730d..7830b6a 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -77,18 +77,6 @@ namespace arrow {
using ::arrow::BitUtil::BytesForBits;
-constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
-constexpr int64_t kMillisecondsInADay = 86400000LL;
-constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
-
-static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
- int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
- int64_t nanoseconds = 0;
-
- memcpy(&nanoseconds, &impala_timestamp.value, sizeof(int64_t));
- return days_since_epoch * kNanosecondsInADay + nanoseconds;
-}
-
template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
@@ -1045,7 +1033,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {
auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
- *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
+ *data_ptr++ = Int96GetNanoSeconds(values[i]);
}
if (reader->nullable_values()) {
@@ -1072,7 +1060,7 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> {
auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
for (int64_t i = 0; i < length; i++) {
- *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
+ *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsPerDay;
}
if (reader->nullable_values()) {
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index d0014a6..af9fbc9 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -423,45 +423,66 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
return Status::OK();
}
+static LogicalType::type LogicalTypeFromArrowTimeUnit(::arrow::TimeUnit::type time_unit) {
+ switch (time_unit) {
+ case ::arrow::TimeUnit::MILLI:
+ return LogicalType::TIMESTAMP_MILLIS;
+ case ::arrow::TimeUnit::MICRO:
+ return LogicalType::TIMESTAMP_MICROS;
+ case ::arrow::TimeUnit::SECOND:
+ case ::arrow::TimeUnit::NANO:
+ // No equivalent parquet logical type.
+ break;
+ }
+
+ return LogicalType::NONE;
+}
+
static Status GetTimestampMetadata(const ::arrow::TimestampType& type,
const ArrowWriterProperties& properties,
ParquetType::type* physical_type,
LogicalType::type* logical_type) {
- auto unit = type.unit();
- *physical_type = ParquetType::INT64;
+ const bool coerce = properties.coerce_timestamps_enabled();
+ const auto unit = coerce ? properties.coerce_timestamps_unit() : type.unit();
- if (properties.coerce_timestamps_enabled()) {
- auto coerce_unit = properties.coerce_timestamps_unit();
- if (coerce_unit == ::arrow::TimeUnit::MILLI) {
- *logical_type = LogicalType::TIMESTAMP_MILLIS;
- } else if (coerce_unit == ::arrow::TimeUnit::MICRO) {
- *logical_type = LogicalType::TIMESTAMP_MICROS;
- } else {
- return Status::NotImplemented(
- "Can only coerce Arrow timestamps to milliseconds"
- " or microseconds");
+ // The user is explicitly asking for Impala int96 encoding, there is no
+ // logical type.
+ if (properties.support_deprecated_int96_timestamps()) {
+ *physical_type = ParquetType::INT96;
+ return Status::OK();
+ }
+
+ *physical_type = ParquetType::INT64;
+ *logical_type = LogicalTypeFromArrowTimeUnit(unit);
+
+ // The user is requesting that all timestamp columns are casted to a specific
+ // type. Only 2 TimeUnit are supported by arrow-parquet.
+ if (coerce) {
+ switch (unit) {
+ case ::arrow::TimeUnit::MILLI:
+ case ::arrow::TimeUnit::MICRO:
+ break;
+ case ::arrow::TimeUnit::NANO:
+ case ::arrow::TimeUnit::SECOND:
+ return Status::NotImplemented(
+ "Can only coerce Arrow timestamps to milliseconds"
+ " or microseconds");
}
+
return Status::OK();
}
- if (unit == ::arrow::TimeUnit::MILLI) {
- *logical_type = LogicalType::TIMESTAMP_MILLIS;
- } else if (unit == ::arrow::TimeUnit::MICRO) {
+ // Until ARROW-3729 is resolved, nanoseconds are explicitly converted to
+ // int64 microseconds when deprecated int96 is not requested.
+ if (type.unit() == ::arrow::TimeUnit::NANO)
*logical_type = LogicalType::TIMESTAMP_MICROS;
- } else if (unit == ::arrow::TimeUnit::NANO) {
- if (properties.support_deprecated_int96_timestamps()) {
- *physical_type = ParquetType::INT96;
- // No corresponding logical type
- } else {
- *logical_type = LogicalType::TIMESTAMP_MICROS;
- }
- } else {
+ else if (type.unit() == ::arrow::TimeUnit::SECOND)
return Status::NotImplemented(
"Only MILLI, MICRO, and NANOS units supported for Arrow timestamps with "
"Parquet.");
- }
+
return Status::OK();
-}
+} // namespace arrow
Status FieldToNode(const std::shared_ptr<Field>& field,
const WriterProperties& properties,
@@ -698,7 +719,7 @@ int32_t DecimalSize(int32_t precision) {
}
DCHECK(false);
return -1;
-}
+} // namespace arrow
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 402cbf0..bce9f37 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -386,7 +386,11 @@ class ArrowColumnWriter {
Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels,
const typename ParquetType::c_type* values) {
- auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ auto typed_writer =
+ ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ // WriteBatch was called with type mismatching the writer_'s type. This
+ // could be a schema conversion problem.
+ DCHECK(typed_writer);
PARQUET_CATCH_NOT_OK(
typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
return Status::OK();
@@ -397,7 +401,11 @@ class ArrowColumnWriter {
const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset,
const typename ParquetType::c_type* values) {
- auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ auto typed_writer =
+ ::arrow::internal::checked_cast<TypedColumnWriter<ParquetType>*>(writer_);
+ // WriteBatchSpaced was called with type mismatching the writer_'s type. This
+ // could be a schema conversion problem.
+ DCHECK(typed_writer);
PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
return Status::OK();
@@ -570,20 +578,42 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
+#define CONV_CASE_LOOP(ConversionFunction) \
+ for (int64_t i = 0; i < num_values; i++) \
+ ConversionFunction(arrow_values[i], &output[i]);
+
+static void ConvertArrowTimestampToParquetInt96(const int64_t* arrow_values,
+ int64_t num_values,
+ ::arrow::TimeUnit ::type unit_type,
+ Int96* output) {
+ switch (unit_type) {
+ case TimeUnit::NANO:
+ CONV_CASE_LOOP(internal::NanosecondsToImpalaTimestamp);
+ break;
+ case TimeUnit::MICRO:
+ CONV_CASE_LOOP(internal::MicrosecondsToImpalaTimestamp);
+ break;
+ case TimeUnit::MILLI:
+ CONV_CASE_LOOP(internal::MillisecondsToImpalaTimestamp);
+ break;
+ case TimeUnit::SECOND:
+ CONV_CASE_LOOP(internal::SecondsToImpalaTimestamp);
+ break;
+ }
+}
+
+#undef CONV_CASE_LOOP
+
template <>
Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
int64_t valid_bits_offset, const int64_t* values) {
- Int96* buffer;
+ Int96* buffer = nullptr;
RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
- if (type.unit() == TimeUnit::NANO) {
- for (int i = 0; i < num_values; i++) {
- internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]);
- }
- } else {
- return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
- }
+
+ ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
+
return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
valid_bits_offset, buffer);
}
@@ -592,15 +622,11 @@ template <>
Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
- Int96* buffer;
+ Int96* buffer = nullptr;
RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
- if (type.unit() == TimeUnit::NANO) {
- for (int i = 0; i < num_values; i++) {
- internal::NanosecondsToImpalaTimestamp(values[i], buffer + i);
- }
- } else {
- return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
- }
+
+ ConvertArrowTimestampToParquetInt96(values, num_values, type.unit(), buffer);
+
return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
}
@@ -611,21 +637,15 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level
const bool is_nanosecond = type.unit() == TimeUnit::NANO;
- // In the case where support_deprecated_int96_timestamps was specified
- // and coerce_timestamps_enabled was specified, a nanosecond column
- // will have a physical type of int64. In that case, we fall through
- // to the else if below.
- //
- // See https://issues.apache.org/jira/browse/ARROW-2082
- if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps() &&
- !ctx_->properties->coerce_timestamps_enabled()) {
+ if (ctx_->properties->support_deprecated_int96_timestamps()) {
+ // The user explicitly required to use Int96 storage.
return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
def_levels, rep_levels);
} else if (is_nanosecond ||
(ctx_->properties->coerce_timestamps_enabled() &&
(type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
// Casting is required. This covers several cases
- // * Nanoseconds -> cast to microseconds
+ // * Nanoseconds -> cast to microseconds (until ARROW-3729 is resolved)
// * coerce_timestamps_enabled_, cast all timestamps to requested unit
return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
num_levels, def_levels, rep_levels);
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index 2538c02..50cb4cf 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -45,19 +45,19 @@ class PARQUET_EXPORT ArrowWriterProperties {
class Builder {
public:
Builder()
- : write_nanos_as_int96_(false),
+ : write_timestamps_as_int96_(false),
coerce_timestamps_enabled_(false),
coerce_timestamps_unit_(::arrow::TimeUnit::SECOND),
truncated_timestamps_allowed_(false) {}
virtual ~Builder() {}
Builder* disable_deprecated_int96_timestamps() {
- write_nanos_as_int96_ = false;
+ write_timestamps_as_int96_ = false;
return this;
}
Builder* enable_deprecated_int96_timestamps() {
- write_nanos_as_int96_ = true;
+ write_timestamps_as_int96_ = true;
return this;
}
@@ -79,19 +79,19 @@ class PARQUET_EXPORT ArrowWriterProperties {
std::shared_ptr<ArrowWriterProperties> build() {
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
- write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
+ write_timestamps_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
truncated_timestamps_allowed_));
}
private:
- bool write_nanos_as_int96_;
+ bool write_timestamps_as_int96_;
bool coerce_timestamps_enabled_;
::arrow::TimeUnit::type coerce_timestamps_unit_;
bool truncated_timestamps_allowed_;
};
- bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; }
+ bool support_deprecated_int96_timestamps() const { return write_timestamps_as_int96_; }
bool coerce_timestamps_enabled() const { return coerce_timestamps_enabled_; }
::arrow::TimeUnit::type coerce_timestamps_unit() const {
@@ -105,12 +105,12 @@ class PARQUET_EXPORT ArrowWriterProperties {
bool coerce_timestamps_enabled,
::arrow::TimeUnit::type coerce_timestamps_unit,
bool truncated_timestamps_allowed)
- : write_nanos_as_int96_(write_nanos_as_int96),
+ : write_timestamps_as_int96_(write_nanos_as_int96),
coerce_timestamps_enabled_(coerce_timestamps_enabled),
coerce_timestamps_unit_(coerce_timestamps_unit),
truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
- const bool write_nanos_as_int96_;
+ const bool write_timestamps_as_int96_;
const bool coerce_timestamps_enabled_;
const ::arrow::TimeUnit::type coerce_timestamps_unit_;
const bool truncated_timestamps_allowed_;
@@ -208,24 +208,52 @@ namespace internal {
* Timestamp conversion constants
*/
constexpr int64_t kJulianEpochOffsetDays = INT64_C(2440588);
-constexpr int64_t kNanosecondsPerDay = INT64_C(86400000000000);
-/**
- * Converts nanosecond timestamps to Impala (Int96) format
- */
-inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
- Int96* impala_timestamp) {
- int64_t julian_days = (nanoseconds / kNanosecondsPerDay) + kJulianEpochOffsetDays;
+template <int64_t UnitPerDay, int64_t NanosecondsPerUnit>
+inline void ArrowTimestampToImpalaTimestamp(const int64_t time, Int96* impala_timestamp) {
+ int64_t julian_days = (time / UnitPerDay) + kJulianEpochOffsetDays;
(*impala_timestamp).value[2] = (uint32_t)julian_days;
- int64_t last_day_nanos = nanoseconds % kNanosecondsPerDay;
+ int64_t last_day_units = time % UnitPerDay;
int64_t* impala_last_day_nanos = reinterpret_cast<int64_t*>(impala_timestamp);
- *impala_last_day_nanos = last_day_nanos;
+ *impala_last_day_nanos = last_day_units * NanosecondsPerUnit;
+}
+
+constexpr int64_t kSecondsInNanos = INT64_C(1000000000);
+
+inline void SecondsToImpalaTimestamp(const int64_t seconds, Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kSecondsPerDay, kSecondsInNanos>(seconds,
+ impala_timestamp);
+}
+
+constexpr int64_t kMillisecondsInNanos = kSecondsInNanos / INT64_C(1000);
+
+inline void MillisecondsToImpalaTimestamp(const int64_t milliseconds,
+ Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kMillisecondsPerDay, kMillisecondsInNanos>(
+ milliseconds, impala_timestamp);
+}
+
+constexpr int64_t kMicrosecondsInNanos = kMillisecondsInNanos / INT64_C(1000);
+
+inline void MicrosecondsToImpalaTimestamp(const int64_t microseconds,
+ Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kMicrosecondsPerDay, kMicrosecondsInNanos>(
+ microseconds, impala_timestamp);
+}
+
+constexpr int64_t kNanosecondsInNanos = INT64_C(1);
+
+inline void NanosecondsToImpalaTimestamp(const int64_t nanoseconds,
+ Int96* impala_timestamp) {
+ ArrowTimestampToImpalaTimestamp<kNanosecondsPerDay, kNanosecondsInNanos>(
+ nanoseconds, impala_timestamp);
}
} // namespace internal
} // namespace arrow
+
} // namespace parquet
#endif // PARQUET_ARROW_WRITER_H
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index b277180..1812f55 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -175,6 +175,19 @@ struct FixedLenByteArray {
using FLBA = FixedLenByteArray;
+// Julian day at unix epoch.
+//
+// The Julian Day Number (JDN) is the integer assigned to a whole solar day in
+// the Julian day count starting from noon Universal time, with Julian day
+// number 0 assigned to the day starting at noon on Monday, January 1, 4713 BC,
+// proleptic Julian calendar (November 24, 4714 BC, in the proleptic Gregorian
+// calendar),
+constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588);
+constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24);
+constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000);
+constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000);
+constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000);
+
MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
STRUCT_END(Int96, 12);
@@ -192,6 +205,14 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds)
std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds));
}
+static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
+ int64_t days_since_epoch = i96.value[2] - kJulianToUnixEpochDays;
+ int64_t nanoseconds = 0;
+
+ memcpy(&nanoseconds, &i96.value, sizeof(int64_t));
+ return days_since_epoch * kNanosecondsPerDay + nanoseconds;
+}
+
static inline std::string Int96ToString(const Int96& a) {
std::ostringstream result;
std::copy(a.value, a.value + 3, std::ostream_iterator<uint32_t>(result, " "));
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index b89145a..feaa890 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -284,8 +284,8 @@ use_dictionary : bool or list
Specify if we should use dictionary encoding in general or only for
some columns.
use_deprecated_int96_timestamps : boolean, default None
- Write nanosecond resolution timestamps to INT96 Parquet
- format. Defaults to False unless enabled by flavor argument
+ Write timestamps to INT96 Parquet format. Defaults to False unless enabled
+ by flavor argument. This take priority over the coerce_timestamps option.
coerce_timestamps : string, default None
Cast timestamps a particular resolution.
Valid values: {None, 'ms', 'us'}
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 5c27a9b..82c80e9 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -844,7 +844,7 @@ def test_date_time_types():
a2 = pa.array(data2, type=t2)
t3 = pa.timestamp('us')
- start = pd.Timestamp('2000-01-01').value / 1000
+ start = pd.Timestamp('2001-01-01').value / 1000
data3 = np.array([start, start + 1, start + 2], dtype='int64')
a3 = pa.array(data3, type=t3)
@@ -892,8 +892,9 @@ def test_date_time_types():
# date64 as date32
# time32[s] to time32[ms]
+ # 'timestamp[ms]' is saved as INT96 timestamp
# 'timestamp[ns]' is saved as INT96 timestamp
- expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
+ expected = pa.Table.from_arrays([a1, a1, a7, a4, a5, ex_a6, a7],
['date32', 'date64', 'timestamp[us]',
'time32[s]', 'time64[us]',
'time32_from64[s]',