You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/07/13 17:02:30 UTC

[arrow] 35/43: ARROW-5878: [C++][Parquet] Restore pre-0.14.0 Parquet forward compatibility by adding option to unconditionally set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch maint-0.14.x
in repository https://gitbox.apache.org/repos/asf/arrow.git

commit 6dadd494616fa5ebd37be94ceec98a50324fa7fb
Author: Wes McKinney <we...@apache.org>
AuthorDate: Fri Jul 12 12:47:36 2019 -0500

    ARROW-5878: [C++][Parquet] Restore pre-0.14.0 Parquet forward compatibility by adding option to unconditionally set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType
    
    Set TIMESTAMP_MICROS/TIMESTAMP_MILLIS ConvertedType whether `isAdjustedToUTC` is set in the LogicalType or not
    
    Author: Wes McKinney <we...@apache.org>
    Author: Benjamin Kietzman <be...@gmail.com>
    
    Closes #4825 from bkietz/5878-C-Parquet-reader-not-forward-compatible- and squashes the following commits:
    
    f5213f576 <Wes McKinney> Add force_set_converted_type to ToString and ToJSON output
    9350fea40 <Wes McKinney> Add option to force setting ConvertedType for timestamps
    b8fd4d61d <Benjamin Kietzman> Coerce strictly to TIMESTAMP_MICROS/TIMESTAMP_MILLIS
---
 cpp/src/parquet/arrow/arrow-schema-test.cc | 21 +++++++-----
 cpp/src/parquet/arrow/schema.cc            | 12 +++++--
 cpp/src/parquet/schema-test.cc             | 42 +++++++++++++++--------
 cpp/src/parquet/types.cc                   | 55 ++++++++++++++++++++++--------
 cpp/src/parquet/types.h                    | 18 ++++++++--
 5 files changed, 106 insertions(+), 42 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-schema-test.cc b/cpp/src/parquet/arrow/arrow-schema-test.cc
index 9a43c7d..6972620 100644
--- a/cpp/src/parquet/arrow/arrow-schema-test.cc
+++ b/cpp/src/parquet/arrow/arrow-schema-test.cc
@@ -855,14 +855,18 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
       {"time64(nanosecond)", ::arrow::time64(::arrow::TimeUnit::NANO),
        LogicalType::Time(true, LogicalType::TimeUnit::NANOS), ParquetType::INT64, -1},
       {"timestamp(millisecond)", ::arrow::timestamp(::arrow::TimeUnit::MILLI),
-       LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), ParquetType::INT64,
-       -1},
+       LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
+                              /*force_set_converted_type=*/true),
+       ParquetType::INT64, -1},
       {"timestamp(microsecond)", ::arrow::timestamp(::arrow::TimeUnit::MICRO),
-       LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64,
-       -1},
+       LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS,
+                              /*force_set_converted_type=*/true),
+       ParquetType::INT64, -1},
+      // Parquet v1, values converted to microseconds
       {"timestamp(nanosecond)", ::arrow::timestamp(::arrow::TimeUnit::NANO),
-       LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), ParquetType::INT64,
-       -1},
+       LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS,
+                              /*force_set_converted_type=*/true),
+       ParquetType::INT64, -1},
       {"timestamp(millisecond, UTC)", ::arrow::timestamp(::arrow::TimeUnit::MILLI, "UTC"),
        LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS), ParquetType::INT64,
        -1},
@@ -887,14 +891,15 @@ TEST_F(TestConvertArrowSchema, ArrowFields) {
   std::vector<NodePtr> parquet_fields;
 
   for (const FieldConstructionArguments& c : cases) {
-    arrow_fields.push_back(std::make_shared<Field>(c.name, c.datatype, false));
+    arrow_fields.push_back(::arrow::field(c.name, c.datatype, false));
     parquet_fields.push_back(PrimitiveNode::Make(c.name, Repetition::REQUIRED,
                                                  c.logical_type, c.physical_type,
                                                  c.physical_length));
   }
 
   ASSERT_OK(ConvertSchema(arrow_fields));
-  ASSERT_NO_FATAL_FAILURE(CheckFlatSchema(parquet_fields));
+  CheckFlatSchema(parquet_fields);
+  // ASSERT_NO_FATAL_FAILURE();
 }
 
 TEST_F(TestConvertArrowSchema, ArrowNonconvertibleFields) {
diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc
index 9284c8c..12d4194 100644
--- a/cpp/src/parquet/arrow/schema.cc
+++ b/cpp/src/parquet/arrow/schema.cc
@@ -521,11 +521,19 @@ Status StructToNode(const std::shared_ptr<::arrow::StructType>& type,
 static std::shared_ptr<const LogicalType> TimestampLogicalTypeFromArrowTimestamp(
     const ::arrow::TimestampType& timestamp_type, ::arrow::TimeUnit::type time_unit) {
   const bool utc = !(timestamp_type.timezone().empty());
+  // ARROW-5878(wesm): for forward compatibility reasons, and because
+  // there's no other way to signal to old readers that values are
+  // timestamps, we force the ConvertedType field to be set to the
+  // corresponding TIMESTAMP_* value. This does cause some ambiguity
+  // as Parquet readers have not been consistent about the
+  // interpretation of TIMESTAMP_* values as being UTC-normalized.
   switch (time_unit) {
     case ::arrow::TimeUnit::MILLI:
-      return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS);
+      return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MILLIS,
+                                    /*force_set_converted_type=*/true);
     case ::arrow::TimeUnit::MICRO:
-      return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS);
+      return LogicalType::Timestamp(utc, LogicalType::TimeUnit::MICROS,
+                                    /*force_set_converted_type=*/true);
     case ::arrow::TimeUnit::NANO:
       return LogicalType::Timestamp(utc, LogicalType::TimeUnit::NANOS);
     case ::arrow::TimeUnit::SECOND:
diff --git a/cpp/src/parquet/schema-test.cc b/cpp/src/parquet/schema-test.cc
index 4e11d1f..badd997 100644
--- a/cpp/src/parquet/schema-test.cc
+++ b/cpp/src/parquet/schema-test.cc
@@ -1079,8 +1079,6 @@ TEST(TestLogicalTypeConstruction, NewTypeIncompatibility) {
       {LogicalType::Time(false, LogicalType::TimeUnit::MICROS), check_is_time},
       {LogicalType::Time(false, LogicalType::TimeUnit::NANOS), check_is_time},
       {LogicalType::Time(true, LogicalType::TimeUnit::NANOS), check_is_time},
-      {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS), check_is_timestamp},
-      {LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS), check_is_timestamp},
       {LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS), check_is_timestamp},
       {LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS), check_is_timestamp},
   };
@@ -1399,23 +1397,29 @@ TEST(TestLogicalTypeOperation, LogicalTypeRepresentation) {
        "Time(isAdjustedToUTC=false, timeUnit=nanoseconds)",
        R"({"Type": "Time", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"},
       {LogicalType::Timestamp(true, LogicalType::TimeUnit::MILLIS),
-       "Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds)",
-       R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds"})"},
+       "Timestamp(isAdjustedToUTC=true, timeUnit=milliseconds, "
+       "force_set_converted_type=false)",
+       R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "milliseconds", "force_set_converted_type": false})"},
       {LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS),
-       "Timestamp(isAdjustedToUTC=true, timeUnit=microseconds)",
-       R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds"})"},
+       "Timestamp(isAdjustedToUTC=true, timeUnit=microseconds, "
+       "force_set_converted_type=false)",
+       R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "microseconds", "force_set_converted_type": false})"},
       {LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS),
-       "Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds)",
-       R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds"})"},
-      {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS),
-       "Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds)",
-       R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds"})"},
+       "Timestamp(isAdjustedToUTC=true, timeUnit=nanoseconds, "
+       "force_set_converted_type=false)",
+       R"({"Type": "Timestamp", "isAdjustedToUTC": true, "timeUnit": "nanoseconds", "force_set_converted_type": false})"},
+      {LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS, true),
+       "Timestamp(isAdjustedToUTC=false, timeUnit=milliseconds, "
+       "force_set_converted_type=true)",
+       R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "milliseconds", "force_set_converted_type": true})"},
       {LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS),
-       "Timestamp(isAdjustedToUTC=false, timeUnit=microseconds)",
-       R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds"})"},
+       "Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, "
+       "force_set_converted_type=false)",
+       R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "microseconds", "force_set_converted_type": false})"},
       {LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS),
-       "Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds)",
-       R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds"})"},
+       "Timestamp(isAdjustedToUTC=false, timeUnit=nanoseconds, "
+       "force_set_converted_type=false)",
+       R"({"Type": "Timestamp", "isAdjustedToUTC": false, "timeUnit": "nanoseconds", "force_set_converted_type": false})"},
       {LogicalType::Interval(), "Interval", R"({"Type": "Interval"})"},
       {LogicalType::Int(8, false), "Int(bitWidth=8, isSigned=false)",
        R"({"Type": "Int", "bitWidth": 8, "isSigned": false})"},
@@ -1914,10 +1918,18 @@ TEST_F(TestTemporalSchemaElementConstruction, TemporalCases) {
        Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
       {"timestamp_F_ms", LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS),
        Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
+      {"timestamp_F_ms_force",
+       LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
+                              /*force_set_converted_type=*/true),
+       Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
       {"timestamp_T_us", LogicalType::Timestamp(true, LogicalType::TimeUnit::MICROS),
        Type::INT64, -1, true, ConvertedType::TIMESTAMP_MICROS, true, check_TIMESTAMP},
       {"timestamp_F_us", LogicalType::Timestamp(false, LogicalType::TimeUnit::MICROS),
        Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
+      {"timestamp_F_us_force",
+       LogicalType::Timestamp(false, LogicalType::TimeUnit::MILLIS,
+                              /*force_set_converted_type=*/true),
+       Type::INT64, -1, true, ConvertedType::TIMESTAMP_MILLIS, true, check_TIMESTAMP},
       {"timestamp_T_ns", LogicalType::Timestamp(true, LogicalType::TimeUnit::NANOS),
        Type::INT64, -1, false, ConvertedType::NA, true, check_TIMESTAMP},
       {"timestamp_F_ns", LogicalType::Timestamp(false, LogicalType::TimeUnit::NANOS),
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index 644e28f..2c75439 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -496,9 +496,11 @@ std::shared_ptr<const LogicalType> LogicalType::Time(
 }
 
 std::shared_ptr<const LogicalType> LogicalType::Timestamp(
-    bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit) {
+    bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit,
+    bool force_set_converted_type) {
   DCHECK(time_unit != LogicalType::TimeUnit::UNKNOWN);
-  return TimestampLogicalType::Make(is_adjusted_to_utc, time_unit);
+  return TimestampLogicalType::Make(is_adjusted_to_utc, time_unit,
+                                    force_set_converted_type);
 }
 
 std::shared_ptr<const LogicalType> LogicalType::Interval() {
@@ -1204,14 +1206,19 @@ class LogicalType::Impl::Timestamp final : public LogicalType::Impl::Compatible,
   bool is_adjusted_to_utc() const { return adjusted_; }
   LogicalType::TimeUnit::unit time_unit() const { return unit_; }
 
+  bool force_set_converted_type() const { return force_set_converted_type_; }
+
  private:
-  Timestamp(bool a, LogicalType::TimeUnit::unit u)
+  Timestamp(bool adjusted, LogicalType::TimeUnit::unit unit,
+            bool force_set_converted_type)
       : LogicalType::Impl(LogicalType::Type::TIMESTAMP, SortOrder::SIGNED),
         LogicalType::Impl::SimpleApplicable(parquet::Type::INT64),
-        adjusted_(a),
-        unit_(u) {}
+        adjusted_(adjusted),
+        unit_(unit),
+        force_set_converted_type_(force_set_converted_type) {}
   bool adjusted_ = false;
   LogicalType::TimeUnit::unit unit_;
+  bool force_set_converted_type_ = false;
 };
 
 bool LogicalType::Impl::Timestamp::is_compatible(
@@ -1219,10 +1226,20 @@ bool LogicalType::Impl::Timestamp::is_compatible(
     schema::DecimalMetadata converted_decimal_metadata) const {
   if (converted_decimal_metadata.isset) {
     return false;
-  } else if (adjusted_ && unit_ == LogicalType::TimeUnit::MILLIS) {
-    return converted_type == ConvertedType::TIMESTAMP_MILLIS;
-  } else if (adjusted_ && unit_ == LogicalType::TimeUnit::MICROS) {
-    return converted_type == ConvertedType::TIMESTAMP_MICROS;
+  } else if (unit_ == LogicalType::TimeUnit::MILLIS) {
+    if (adjusted_ || force_set_converted_type_) {
+      return converted_type == ConvertedType::TIMESTAMP_MILLIS;
+    } else {
+      return (converted_type == ConvertedType::NONE) ||
+             (converted_type == ConvertedType::NA);
+    }
+  } else if (unit_ == LogicalType::TimeUnit::MICROS) {
+    if (adjusted_ || force_set_converted_type_) {
+      return converted_type == ConvertedType::TIMESTAMP_MICROS;
+    } else {
+      return (converted_type == ConvertedType::NONE) ||
+             (converted_type == ConvertedType::NA);
+    }
   } else {
     return (converted_type == ConvertedType::NONE) ||
            (converted_type == ConvertedType::NA);
@@ -1232,7 +1249,7 @@ bool LogicalType::Impl::Timestamp::is_compatible(
 ConvertedType::type LogicalType::Impl::Timestamp::ToConvertedType(
     schema::DecimalMetadata* out_decimal_metadata) const {
   reset_decimal_metadata(out_decimal_metadata);
-  if (adjusted_) {
+  if (adjusted_ || force_set_converted_type_) {
     if (unit_ == LogicalType::TimeUnit::MILLIS) {
       return ConvertedType::TIMESTAMP_MILLIS;
     } else if (unit_ == LogicalType::TimeUnit::MICROS) {
@@ -1245,14 +1262,16 @@ ConvertedType::type LogicalType::Impl::Timestamp::ToConvertedType(
 std::string LogicalType::Impl::Timestamp::ToString() const {
   std::stringstream type;
   type << "Timestamp(isAdjustedToUTC=" << std::boolalpha << adjusted_
-       << ", timeUnit=" << time_unit_string(unit_) << ")";
+       << ", timeUnit=" << time_unit_string(unit_)
+       << ", force_set_converted_type=" << force_set_converted_type_ << ")";
   return type.str();
 }
 
 std::string LogicalType::Impl::Timestamp::ToJSON() const {
   std::stringstream json;
   json << R"({"Type": "Timestamp", "isAdjustedToUTC": )" << std::boolalpha << adjusted_
-       << R"(, "timeUnit": ")" << time_unit_string(unit_) << R"("})";
+       << R"(, "timeUnit": ")" << time_unit_string(unit_)
+       << R"(", "force_set_converted_type": )" << force_set_converted_type_ << R"(})";
   return json.str();
 }
 
@@ -1288,13 +1307,14 @@ bool LogicalType::Impl::Timestamp::Equals(const LogicalType& other) const {
 }
 
 std::shared_ptr<const LogicalType> TimestampLogicalType::Make(
-    bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit) {
+    bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit,
+    bool force_set_converted_type) {
   if (time_unit == LogicalType::TimeUnit::MILLIS ||
       time_unit == LogicalType::TimeUnit::MICROS ||
       time_unit == LogicalType::TimeUnit::NANOS) {
     auto* logical_type = new TimestampLogicalType();
-    logical_type->impl_.reset(
-        new LogicalType::Impl::Timestamp(is_adjusted_to_utc, time_unit));
+    logical_type->impl_.reset(new LogicalType::Impl::Timestamp(
+        is_adjusted_to_utc, time_unit, force_set_converted_type));
     return std::shared_ptr<const LogicalType>(logical_type);
   } else {
     throw ParquetException(
@@ -1310,6 +1330,11 @@ LogicalType::TimeUnit::unit TimestampLogicalType::time_unit() const {
   return (dynamic_cast<const LogicalType::Impl::Timestamp&>(*impl_)).time_unit();
 }
 
+bool TimestampLogicalType::force_set_converted_type() const {
+  return (dynamic_cast<const LogicalType::Impl::Timestamp&>(*impl_))
+      .force_set_converted_type();
+}
+
 class LogicalType::Impl::Interval final : public LogicalType::Impl::SimpleCompatible,
                                           public LogicalType::Impl::TypeLengthApplicable {
  public:
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index 0bfaf99..6f1906f 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -182,8 +182,17 @@ class PARQUET_EXPORT LogicalType {
   static std::shared_ptr<const LogicalType> Date();
   static std::shared_ptr<const LogicalType> Time(bool is_adjusted_to_utc,
                                                  LogicalType::TimeUnit::unit time_unit);
+
+  /// \brief Create a Timestamp logical type
+  /// \param[in] is_adjusted_to_utc set true if the data is UTC-normalized
+  /// \param[in] time_unit the resolution of the timestamp
+  /// \param[in] force_set_converted_type if true, always set the
+  /// legacy ConvertedType TIMESTAMP_MICROS and TIMESTAMP_MILLIS
+  /// metadata. Default is false
   static std::shared_ptr<const LogicalType> Timestamp(
-      bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit);
+      bool is_adjusted_to_utc, LogicalType::TimeUnit::unit time_unit,
+      bool force_set_converted_type = false);
+
   static std::shared_ptr<const LogicalType> Interval();
   static std::shared_ptr<const LogicalType> Int(int bit_width, bool is_signed);
   static std::shared_ptr<const LogicalType> Null();
@@ -337,10 +346,15 @@ class PARQUET_EXPORT TimeLogicalType : public LogicalType {
 class PARQUET_EXPORT TimestampLogicalType : public LogicalType {
  public:
   static std::shared_ptr<const LogicalType> Make(bool is_adjusted_to_utc,
-                                                 LogicalType::TimeUnit::unit time_unit);
+                                                 LogicalType::TimeUnit::unit time_unit,
+                                                 bool force_set_converted_type = false);
   bool is_adjusted_to_utc() const;
   LogicalType::TimeUnit::unit time_unit() const;
 
+  /// \brief If true, will set ConvertedType for micros and millis
+  /// resolution in legacy ConvertedType Thrift metadata
+  bool force_set_converted_type() const;
+
  private:
   TimestampLogicalType() = default;
 };