You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/13 17:02:30 UTC
[arrow] 35/43: ARROW-5878: [C++][Parquet] Restore pre-0.14.0
Parquet forward compatibility by adding option to unconditionally set
TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch maint-0.14.x
in repository https://gitbox.apache.org/repos/asf/arrow.git
commit 6dadd494616fa5ebd37be94ceec98a50324fa7fb
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Jul 12 12:47:36 2019 -0500
ARROW-5878: [C++][Parquet] Restore pre-0.14.0 Parquet forward compatibility by adding option to unconditionally set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType
Set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType whether `isAdjustedToUTC` is set in the LogicalType or not
Author: Wes McKinney <we...@apache.org>
Author: Benjamin Kietzman <be...@gmail.com>
Closes #4825 from bkietz/5878-C-Parquet-reader-not-forward-compatible- and squashes the following commits:
f5213f576 <Wes McKinney> Add force_set_converted_type to ToString and ToJSON output
9350fea40 <Wes McKinney> Add option to force setting ConvertedType for timestamps
b8fd4d61d <Benjamin Kietzman> Coerce strictly to TIMESTAMP_MICROS/TIMESTAMP_MILLIS
---
cpp/src/parquet/arrow/arrow-schema-test.cc | 21 +++++++-----
cpp/src/parquet/arrow/schema.cc | 12 +++++--
cpp/src/parquet/schema-test.cc | 42 +++++++++++++++--------
cpp/src/parquet/types.cc | 55 ++++++++++++++++++++++--------
cpp/src/parquet/types.h | 18 ++++++++--
5 files changed, 106 insertions(+), 42 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc
index 9a43c7d..6972620 100644
--- a/cpp/src/parquet/arrow/arrow-schema-test.cc
+++ b/cpp/src/parquet/arrow/arrow-schema-test.cc
@@ -855,14 +855,18 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
{"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
LogicalType::Time(true, LogicalType::TimeUnit::NANOS), ParquetType::INT64, -1},
{"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
- LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), ParquetType::INT64,
- -1},
+ LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
+ /*force_set_converted_type=*/true),
+ ParquetType::INT64, -1},
{"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO),
- LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64,
- -1},
+ LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS,
+ /*force_set_converted_type=*/true),
+ ParquetType::INT64, -1},
+ // Parquet v1, values converted to microseconds
{"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
- LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64,
- -1},
+ LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS,
+ /*force_set_converted_type=*/true),
+ ParquetType::INT64, -1},
{"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS), ParquetType::INT64,
-1},
@@ -887,14 +891,15 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
std::vector<NodePtr> parquet_fields;
for (const FieldConstructionArguments& c : cases) {
- arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false));
+ arrow_fields.push_back(::arrow::field(c.name, c.datatype, false));
parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED,
c.logical_type, c.physical_type,
c.physical_length));
}
ASSERT_OK(ConvertSchema(arrow_fields));
- ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
+ CheckFlatSchema(parquet_fields);
+ // ASSERT_NO_FATAL_FAILURE();
}
TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 9284c8c..12d4194 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -521,11 +521,19 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp(
const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) {
const bool utc = !(timestamp_type.timezone().empty());
+ // ARROW-5878(wesm): for forward compatibility reasons, and because
+ // there's no other way to signal to old readers that values are
+ // timestamps, we force the ConvertedType field to be set to the
+ // corresponding TIMESTAMP_* value. This does cause some ambiguity
+ // as Parquet readers have not been consistent about the
+ // interpretation of TIMESTAMP_* values as being UTC-normalized.
switch (time_unit) {
case ::arrow::TimeUnit::MILLI:
- return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS);
+ return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS,
+ /*force_set_converted_type=*/true);
case ::arrow::TimeUnit::MICRO:
- return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS);
+ return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS,
+ /*force_set_converted_type=*/true);
case ::arrow::TimeUnit::NANO:
return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS);
case ::arrow::TimeUnit::SECOND:
diff --git a/cpp/src/parquet/schema-test.cc b/cpp/src/parquet/schema-test.cc
index 4e11d1f..badd997 100644
--- a/cpp/src/parquet/schema-test.cc
+++ b/cpp/src/parquet/schema-test.cc
@@ -1079,8 +1079,6 @@ TEST(TestLogicalTypeConstruction, NewTypeIncompatibility) {
{LogicalType::Time(false, LogicalType::TimeUnit::MICROS), check_is_time},
{LogicalType::Time(false, LogicalType::TimeUnit::NANOS), check_is_time},
{LogicalType::Time(true, LogicalType::TimeUnit::NANOS), check_is_time},
- {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), check_is_timestamp},
- {LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), check_is_timestamp},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS), check_is_timestamp},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS), check_is_timestamp},
};
@@ -1399,23 +1397,29 @@ TEST(TestLogicalTypeOperation, LogicalTypeRepresentation) {
"Time(isAdjustedToUTC=false, timeUnit=nanoseconds)",
R"({"Type": "Time", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS),
- "Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds)",
- R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds"})"},
+ "Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds, "
+ "force_set_converted_type=false)",
+ R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds", "force_set_converted_type": false})"},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS),
- "Timestamp(isAdjustedToUTC=true, timeUnit=microseconds)",
- R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds"})"},
+ "Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, "
+ "force_set_converted_type=false)",
+ R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds", "force_set_converted_type": false})"},
{LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS),
- "Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds)",
- R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds"})"},
- {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS),
- "Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds)",
- R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds"})"},
+ "Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds, "
+ "force_set_converted_type=false)",
+ R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds", "force_set_converted_type": false})"},
+ {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, true),
+ "Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, "
+ "force_set_converted_type=true)",
+ R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds", "force_set_converted_type": true})"},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS),
- "Timestamp(isAdjustedToUTC=false, timeUnit=microseconds)",
- R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds"})"},
+ "Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, "
+ "force_set_converted_type=false)",
+ R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "force_set_converted_type": false})"},
{LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS),
- "Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds)",
- R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"},
+ "Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds, "
+ "force_set_converted_type=false)",
+ R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds", "force_set_converted_type": false})"},
{LogicalType::Interval(), "Interval", R"({"Type": "Interval"})"},
{LogicalType::Int(8, false), "Int(bitWidth=8, isSigned=false)",
R"({"Type": "Int", "bitWidth": 8, "isSigned": false})"},
@@ -1914,10 +1918,18 @@ TEST_F(TestTemporalSchemaElementConstruction, TemporalCases) {
Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
{"timestamp_F_ms", LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS),
Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
+ {"timestamp_F_ms_force",
+ LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
+ /*force_set_converted_type=*/true),
+ Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
{"timestamp_T_us", LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS),
Type::INT64, -1, true, ConvertedType::TIMESTAMP_MICROS, true, check_TIMESTAMP},
{"timestamp_F_us", LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS),
Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
+ {"timestamp_F_us_force",
+ LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
+ /*force_set_converted_type=*/true),
+ Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
{"timestamp_T_ns", LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS),
Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
{"timestamp_F_ns", LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS),
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index 644e28f..2c75439 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -496,9 +496,11 @@ std::shared_ptr<const LogicalType> LogicalType::Time(
}
std::shared_ptr<const LogicalType> LogicalType::Timestamp(
- bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit) {
+ bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit,
+ bool force_set_converted_type) {
DCHECK(time_unit != LogicalType::TimeUnit::UNKNOWN);
- return TimestampLogicalType::Make(is_adjusted_to_utc, time_unit);
+ return TimestampLogicalType::Make(is_adjusted_to_utc, time_unit,
+ force_set_converted_type);
}
std::shared_ptr<const LogicalType> LogicalType::Interval() {
@@ -1204,14 +1206,19 @@ class LogicalType::Impl::Timestamp final : public LogicalType::Impl::Compatible,
bool is_adjusted_to_utc() const { return adjusted_; }
LogicalType::TimeUnit::unit time_unit() const { return unit_; }
+ bool force_set_converted_type() const { return force_set_converted_type_; }
+
private:
- Timestamp(bool a, LogicalType::TimeUnit::unit u)
+ Timestamp(bool adjusted, LogicalType::TimeUnit::unit unit,
+ bool force_set_converted_type)
: LogicalType::Impl(LogicalType::Type::TIMESTAMP, SortOrder::SIGNED),
LogicalType::Impl::SimpleApplicable(parquet::Type::INT64),
- adjusted_(a),
- unit_(u) {}
+ adjusted_(adjusted),
+ unit_(unit),
+ force_set_converted_type_(force_set_converted_type) {}
bool adjusted_ = false;
LogicalType::TimeUnit::unit unit_;
+ bool force_set_converted_type_ = false;
};
bool LogicalType::Impl::Timestamp::is_compatible(
@@ -1219,10 +1226,20 @@ bool LogicalType::Impl::Timestamp::is_compatible(
schema::DecimalMetadata converted_decimal_metadata) const {
if (converted_decimal_metadata.isset) {
return false;
- } else if (adjusted_ && unit_ == LogicalType::TimeUnit::MILLIS) {
- return converted_type == ConvertedType::TIMESTAMP_MILLIS;
- } else if (adjusted_ && unit_ == LogicalType::TimeUnit::MICROS) {
- return converted_type == ConvertedType::TIMESTAMP_MICROS;
+ } else if (unit_ == LogicalType::TimeUnit::MILLIS) {
+ if (adjusted_ || force_set_converted_type_) {
+ return converted_type == ConvertedType::TIMESTAMP_MILLIS;
+ } else {
+ return (converted_type == ConvertedType::NONE) ||
+ (converted_type == ConvertedType::NA);
+ }
+ } else if (unit_ == LogicalType::TimeUnit::MICROS) {
+ if (adjusted_ || force_set_converted_type_) {
+ return converted_type == ConvertedType::TIMESTAMP_MICROS;
+ } else {
+ return (converted_type == ConvertedType::NONE) ||
+ (converted_type == ConvertedType::NA);
+ }
} else {
return (converted_type == ConvertedType::NONE) ||
(converted_type == ConvertedType::NA);
@@ -1232,7 +1249,7 @@ bool LogicalType::Impl::Timestamp::is_compatible(
ConvertedType::type LogicalType::Impl::Timestamp::ToConvertedType(
schema::DecimalMetadata* out_decimal_metadata) const {
reset_decimal_metadata(out_decimal_metadata);
- if (adjusted_) {
+ if (adjusted_ || force_set_converted_type_) {
if (unit_ == LogicalType::TimeUnit::MILLIS) {
return ConvertedType::TIMESTAMP_MILLIS;
} else if (unit_ == LogicalType::TimeUnit::MICROS) {
@@ -1245,14 +1262,16 @@ ConvertedType::type LogicalType::Impl::Timestamp::ToConvertedType(
std::string LogicalType::Impl::Timestamp::ToString() const {
std::stringstream type;
type << "Timestamp(isAdjustedToUTC=" << std::boolalpha << adjusted_
- << ", timeUnit=" << time_unit_string(unit_) << ")";
+ << ", timeUnit=" << time_unit_string(unit_)
+ << ", force_set_converted_type=" << force_set_converted_type_ << ")";
return type.str();
}
std::string LogicalType::Impl::Timestamp::ToJSON() const {
std::stringstream json;
json << R"({"Type": "Timestamp", "isAdjustedToUTC": )" << std::boolalpha << adjusted_
- << R"(, "timeUnit": ")" << time_unit_string(unit_) << R"("})";
+ << R"(, "timeUnit": ")" << time_unit_string(unit_)
+ << R"(", "force_set_converted_type": )" << force_set_converted_type_ << R"(})";
return json.str();
}
@@ -1288,13 +1307,14 @@ bool LogicalType::Impl::Timestamp::Equals(const LogicalType& other) const {
}
std::shared_ptr<const LogicalType> TimestampLogicalType::Make(
- bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit) {
+ bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit,
+ bool force_set_converted_type) {
if (time_unit == LogicalType::TimeUnit::MILLIS ||
time_unit == LogicalType::TimeUnit::MICROS ||
time_unit == LogicalType::TimeUnit::NANOS) {
auto* logical_type = new TimestampLogicalType();
- logical_type->impl_.reset(
- new LogicalType::Impl::Timestamp(is_adjusted_to_utc, time_unit));
+ logical_type->impl_.reset(new LogicalType::Impl::Timestamp(
+ is_adjusted_to_utc, time_unit, force_set_converted_type));
return std::shared_ptr<const LogicalType>(logical_type);
} else {
throw ParquetException(
@@ -1310,6 +1330,11 @@ LogicalType::TimeUnit::unit TimestampLogicalType::time_unit() const {
return (dynamic_cast<const LogicalType::Impl::Timestamp&>(*impl_)).time_unit();
}
+bool TimestampLogicalType::force_set_converted_type() const {
+ return (dynamic_cast<const LogicalType::Impl::Timestamp&>(*impl_))
+ .force_set_converted_type();
+}
+
class LogicalType::Impl::Interval final : public LogicalType::Impl::SimpleCompatible,
public LogicalType::Impl::TypeLengthApplicable {
public:
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index 0bfaf99..6f1906f 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -182,8 +182,17 @@ class PARQUET_EXPORT LogicalType {
static std::shared_ptr<const LogicalType> Date();
static std::shared_ptr<const LogicalType> Time(bool is_adjusted_to_utc,
LogicalType::TimeUnit::unit time_unit);
+
+ /// \brief Create a Timestamp logical type
+ /// \param[in] is_adjusted_to_utc set true if the data is UTC-normalized
+ /// \param[in] time_unit the resolution of the timestamp
+ /// \param[in] force_set_converted_type if true, always set the
+ /// legacy ConvertedType TIMESTAMP_MICROS and TIMESTAMP_MILLIS
+ /// metadata. Default is false
static std::shared_ptr<const LogicalType> Timestamp(
- bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit);
+ bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit,
+ bool force_set_converted_type = false);
+
static std::shared_ptr<const LogicalType> Interval();
static std::shared_ptr<const LogicalType> Int(int bit_width, bool is_signed);
static std::shared_ptr<const LogicalType> Null();
@@ -337,10 +346,15 @@ class PARQUET_EXPORT TimeLogicalType : public LogicalType {
class PARQUET_EXPORT TimestampLogicalType : public LogicalType {
public:
static std::shared_ptr<const LogicalType> Make(bool is_adjusted_to_utc,
- LogicalType::TimeUnit::unit time_unit);
+ LogicalType::TimeUnit::unit time_unit,
+ bool force_set_converted_type = false);
bool is_adjusted_to_utc() const;
LogicalType::TimeUnit::unit time_unit() const;
+ /// \brief If true, will set ConvertedType for micros and millis
+ /// resolution in legacy ConvertedType Thrift metadata
+ bool force_set_converted_type() const;
+
private:
TimestampLogicalType() = default;
};