You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/22 03:57:21 UTC

[spark] branch branch-3.0 updated: Revert "[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f0bfdc5  Revert "[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies"
f0bfdc5 is described below

commit f0bfdc513a15884de8f3ffc79cc1845991082642
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sat Mar 21 20:52:04 2020 -0700

    Revert "[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies"
    
    This reverts commit a6f3e3b096e2d7a39e0b2fdec6452e6d633baf7e.
    
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  21 ++--
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  18 +--
 .../spark/sql/avro/AvroLogicalTypeSuite.scala      |  98 +++++++++++++++-
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 124 +++------------------
 4 files changed, 130 insertions(+), 131 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 3e8a7f9..b98f303 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -106,22 +106,21 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
       case (LONG, TimestampType) => avroType.getLogicalType match {
         // For backward compatibility, if the Avro type is Long and it is not logical type
         // (the `null` case), the value is processed as timestamp type with millisecond precision.
-        case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
-          val millis = value.asInstanceOf[Long]
-          val micros = DateTimeUtils.fromMillis(millis)
-          val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
-          updater.setLong(ordinal, rebasedMicros)
         case null | _: TimestampMillis => (updater, ordinal, value) =>
           val millis = value.asInstanceOf[Long]
           val micros = DateTimeUtils.fromMillis(millis)
-          updater.setLong(ordinal, micros)
-        case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
-          val micros = value.asInstanceOf[Long]
-          val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
-          updater.setLong(ordinal, rebasedMicros)
+          if (rebaseDateTime) {
+            updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+          } else {
+            updater.setLong(ordinal, micros)
+          }
         case _: TimestampMicros => (updater, ordinal, value) =>
           val micros = value.asInstanceOf[Long]
-          updater.setLong(ordinal, micros)
+          if (rebaseDateTime) {
+            updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+          } else {
+            updater.setLong(ordinal, micros)
+          }
         case other => throw new IncompatibleSchemaException(
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
       }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 68df7c0..af9e3a5 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -149,15 +149,17 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
       case (TimestampType, LONG) => avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
-            val micros = getter.getLong(ordinal)
-            val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(micros)
-            DateTimeUtils.fromMillis(rebasedMicros)
           case null | _: TimestampMillis => (getter, ordinal) =>
-            DateTimeUtils.fromMillis(getter.getLong(ordinal))
-          case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
-            DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
-          case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
+            val micros = getter.getLong(ordinal)
+            val rebasedMicros = if (rebaseDateTime) {
+              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+            } else micros
+            DateTimeUtils.toMillis(rebasedMicros)
+          case _: TimestampMicros => (getter, ordinal) =>
+            val micros = getter.getLong(ordinal)
+            if (rebaseDateTime) {
+              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+            } else micros
           case other => throw new IncompatibleSchemaException(
             s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
         }
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 8256965..9e89b69 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.avro
 
 import java.io.File
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
 
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
       assert(msg.contains("Unscaled value too large for precision"))
     }
   }
+
+  private def readResourceAvroFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    spark.read.format("avro").load(url.toString)
+  }
+
+  test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
+    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+      checkAnswer(
+        readResourceAvroFile("before_1582_date_v2_4.avro"),
+        Row(java.sql.Date.valueOf("1001-01-01")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+    }
+  }
+
+  test("SPARK-31183: rebasing microseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val nonRebased = "1001-01-07 01:09:05.123456"
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq(tsStr).toDF("tsS")
+          .select($"tsS".cast("timestamp").as("ts"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing milliseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val rebased = "1001-01-01 01:02:03.123"
+    val nonRebased = "1001-01-07 01:09:05.123"
+    Seq(
+      """{"type": "long","logicalType": "timestamp-millis"}""",
+      """"long"""").foreach { tsType =>
+      val timestampSchema = s"""
+          |{
+          |  "namespace": "logical",
+          |  "type": "record",
+          |  "name": "test",
+          |  "fields": [
+          |    {"name": "ts", "type": $tsType}
+          |  ]
+          |}""".stripMargin
+      withTempPath { dir =>
+        val path = dir.getAbsolutePath
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+          Seq(tsStr).toDF("tsS")
+            .select($"tsS".cast("timestamp").as("ts"))
+            .write
+            .option("avroSchema", timestampSchema)
+            .format("avro")
+            .save(path)
+
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(rebased)))
+        }
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(nonRebased)))
+        }
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing dates in write") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq("1001-01-01").toDF("dateS")
+          .select($"dateS".cast("date").as("date"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
+      }
+    }
+  }
 }
 
 class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 34a0e2b..360160c 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net.URL
 import java.nio.file.{Files, Paths}
 import java.sql.{Date, Timestamp}
-import java.util.{Locale, UUID}
+import java.util.{Locale, TimeZone, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -35,10 +35,9 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
-import org.apache.spark.sql.TestingUDT.IntervalData
+import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -84,11 +83,6 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
-  private def readResourceAvroFile(name: String): DataFrame = {
-    val url = Thread.currentThread().getContextClassLoader.getResource(name)
-    spark.read.format("avro").load(url.toString)
-  }
-
   test("resolve avro data source") {
     val databricksAvro = "com.databricks.spark.avro"
     // By default the backward compatibility for com.databricks.spark.avro is enabled.
@@ -408,19 +402,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
         StructField("float", FloatType, true),
         StructField("date", DateType, true)
       ))
-      DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
-        val rdd = spark.sparkContext.parallelize(Seq(
-          Row(1f, null),
-          Row(2f, new Date(1451948400000L)),
-          Row(3f, new Date(1460066400500L))
-        ))
-        val df = spark.createDataFrame(rdd, schema)
-        df.write.format("avro").save(dir.toString)
-        assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
-        checkAnswer(
-          spark.read.format("avro").load(dir.toString).select("date"),
-          Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
-      }
+      TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+      val rdd = spark.sparkContext.parallelize(Seq(
+        Row(1f, null),
+        Row(2f, new Date(1451948400000L)),
+        Row(3f, new Date(1460066400500L))
+      ))
+      val df = spark.createDataFrame(rdd, schema)
+      df.write.format("avro").save(dir.toString)
+      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+      checkAnswer(
+        spark.read.format("avro").load(dir.toString).select("date"),
+        Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
     }
   }
 
@@ -1528,95 +1521,6 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       assert(deprecatedEvents.size === 1)
     }
   }
-
-  test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
-    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-      checkAnswer(
-        readResourceAvroFile("before_1582_date_v2_4.avro"),
-        Row(java.sql.Date.valueOf("1001-01-01")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
-    }
-  }
-
-  test("SPARK-31183: rebasing microseconds timestamps in write") {
-    val tsStr = "1001-01-01 01:02:03.123456"
-    val nonRebased = "1001-01-07 01:09:05.123456"
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-        Seq(tsStr).toDF("tsS")
-          .select($"tsS".cast("timestamp").as("ts"))
-          .write.format("avro")
-          .save(path)
-
-        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
-      }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
-      }
-    }
-  }
-
-  test("SPARK-31183: rebasing milliseconds timestamps in write") {
-    val tsStr = "1001-01-01 01:02:03.123456"
-    val rebased = "1001-01-01 01:02:03.123"
-    val nonRebased = "1001-01-07 01:09:05.123"
-    Seq(
-      """{"type": "long","logicalType": "timestamp-millis"}""",
-      """"long"""").foreach { tsType =>
-      val timestampSchema = s"""
-        |{
-        |  "namespace": "logical",
-        |  "type": "record",
-        |  "name": "test",
-        |  "fields": [
-        |    {"name": "ts", "type": $tsType}
-        |  ]
-        |}""".stripMargin
-      withTempPath { dir =>
-        val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-          Seq(tsStr).toDF("tsS")
-            .select($"tsS".cast("timestamp").as("ts"))
-            .write
-            .option("avroSchema", timestampSchema)
-            .format("avro")
-            .save(path)
-
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(rebased)))
-        }
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(nonRebased)))
-        }
-      }
-    }
-  }
-
-  test("SPARK-31183: rebasing dates in write") {
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-        Seq("1001-01-01").toDF("dateS")
-          .select($"dateS".cast("date").as("date"))
-          .write.format("avro")
-          .save(path)
-
-        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
-      }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
-      }
-    }
-  }
 }
 
 class AvroV1Suite extends AvroSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org