You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/20 10:09:54 UTC

[spark] branch branch-3.0 updated: [SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a6f3e3b  [SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies
a6f3e3b is described below

commit a6f3e3b096e2d7a39e0b2fdec6452e6d633baf7e
Author: Maxim Gekk <ma...@gmail.com>
AuthorDate: Fri Mar 20 19:02:54 2020 +0900

    [SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies
    
    1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to `AvroSuite`.
    2. Checking of the `rebaseDateTime` flag is moved out from functions bodies.
    
    1. The tests are moved because they are not directly related to logical types.
    2. Checking the flag out of functions bodies should improve performance.
    
    No
    
    By running Avro tests via the command `build/sbt avro/test`
    
    Closes #27964 from MaxGekk/rebase-avro-datetime-followup.
    
    Authored-by: Maxim Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  21 ++--
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  18 ++-
 .../spark/sql/avro/AvroLogicalTypeSuite.scala      |  98 +---------------
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 124 ++++++++++++++++++---
 4 files changed, 131 insertions(+), 130 deletions(-)

diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index b98f303..3e8a7f9 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -106,21 +106,22 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
       case (LONG, TimestampType) => avroType.getLogicalType match {
         // For backward compatibility, if the Avro type is Long and it is not logical type
         // (the `null` case), the value is processed as timestamp type with millisecond precision.
+        case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
+          val millis = value.asInstanceOf[Long]
+          val micros = DateTimeUtils.fromMillis(millis)
+          val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+          updater.setLong(ordinal, rebasedMicros)
         case null | _: TimestampMillis => (updater, ordinal, value) =>
           val millis = value.asInstanceOf[Long]
           val micros = DateTimeUtils.fromMillis(millis)
-          if (rebaseDateTime) {
-            updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
-          } else {
-            updater.setLong(ordinal, micros)
-          }
+          updater.setLong(ordinal, micros)
+        case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
+          val micros = value.asInstanceOf[Long]
+          val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+          updater.setLong(ordinal, rebasedMicros)
         case _: TimestampMicros => (updater, ordinal, value) =>
           val micros = value.asInstanceOf[Long]
-          if (rebaseDateTime) {
-            updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
-          } else {
-            updater.setLong(ordinal, micros)
-          }
+          updater.setLong(ordinal, micros)
         case other => throw new IncompatibleSchemaException(
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
       }
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index af9e3a5..68df7c0 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -149,17 +149,15 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
       case (TimestampType, LONG) => avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is not logical type
           // (the `null` case), output the timestamp value as with millisecond precision.
-          case null | _: TimestampMillis => (getter, ordinal) =>
-            val micros = getter.getLong(ordinal)
-            val rebasedMicros = if (rebaseDateTime) {
-              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
-            } else micros
-            DateTimeUtils.toMillis(rebasedMicros)
-          case _: TimestampMicros => (getter, ordinal) =>
+          case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
             val micros = getter.getLong(ordinal)
-            if (rebaseDateTime) {
-              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
-            } else micros
+            val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+            DateTimeUtils.fromMillis(rebasedMicros)
+          case null | _: TimestampMillis => (getter, ordinal) =>
+            DateTimeUtils.fromMillis(getter.getLong(ordinal))
+          case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
+            DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
+          case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
           case other => throw new IncompatibleSchemaException(
             s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
         }
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 9e89b69..8256965 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.avro
 
 import java.io.File
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -348,100 +348,6 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
       assert(msg.contains("Unscaled value too large for precision"))
     }
   }
-
-  private def readResourceAvroFile(name: String): DataFrame = {
-    val url = Thread.currentThread().getContextClassLoader.getResource(name)
-    spark.read.format("avro").load(url.toString)
-  }
-
-  test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
-    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-      checkAnswer(
-        readResourceAvroFile("before_1582_date_v2_4.avro"),
-        Row(java.sql.Date.valueOf("1001-01-01")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
-    }
-  }
-
-  test("SPARK-31183: rebasing microseconds timestamps in write") {
-    val tsStr = "1001-01-01 01:02:03.123456"
-    val nonRebased = "1001-01-07 01:09:05.123456"
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-        Seq(tsStr).toDF("tsS")
-          .select($"tsS".cast("timestamp").as("ts"))
-          .write.format("avro")
-          .save(path)
-
-        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
-      }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
-      }
-    }
-  }
-
-  test("SPARK-31183: rebasing milliseconds timestamps in write") {
-    val tsStr = "1001-01-01 01:02:03.123456"
-    val rebased = "1001-01-01 01:02:03.123"
-    val nonRebased = "1001-01-07 01:09:05.123"
-    Seq(
-      """{"type": "long","logicalType": "timestamp-millis"}""",
-      """"long"""").foreach { tsType =>
-      val timestampSchema = s"""
-          |{
-          |  "namespace": "logical",
-          |  "type": "record",
-          |  "name": "test",
-          |  "fields": [
-          |    {"name": "ts", "type": $tsType}
-          |  ]
-          |}""".stripMargin
-      withTempPath { dir =>
-        val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-          Seq(tsStr).toDF("tsS")
-            .select($"tsS".cast("timestamp").as("ts"))
-            .write
-            .option("avroSchema", timestampSchema)
-            .format("avro")
-            .save(path)
-
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(rebased)))
-        }
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(nonRebased)))
-        }
-      }
-    }
-  }
-
-  test("SPARK-31183: rebasing dates in write") {
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-        Seq("1001-01-01").toDF("dateS")
-          .select($"dateS".cast("date").as("date"))
-          .write.format("avro")
-          .save(path)
-
-        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
-      }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
-      }
-    }
-  }
 }
 
 class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 360160c..34a0e2b 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net.URL
 import java.nio.file.{Files, Paths}
 import java.sql.{Date, Timestamp}
-import java.util.{Locale, TimeZone, UUID}
+import java.util.{Locale, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -35,9 +35,10 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
-import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
+import org.apache.spark.sql.TestingUDT.IntervalData
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -83,6 +84,11 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  private def readResourceAvroFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    spark.read.format("avro").load(url.toString)
+  }
+
   test("resolve avro data source") {
     val databricksAvro = "com.databricks.spark.avro"
     // By default the backward compatibility for com.databricks.spark.avro is enabled.
@@ -402,18 +408,19 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
         StructField("float", FloatType, true),
         StructField("date", DateType, true)
       ))
-      TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
-      val rdd = spark.sparkContext.parallelize(Seq(
-        Row(1f, null),
-        Row(2f, new Date(1451948400000L)),
-        Row(3f, new Date(1460066400500L))
-      ))
-      val df = spark.createDataFrame(rdd, schema)
-      df.write.format("avro").save(dir.toString)
-      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
-      checkAnswer(
-        spark.read.format("avro").load(dir.toString).select("date"),
-        Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
+      DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
+        val rdd = spark.sparkContext.parallelize(Seq(
+          Row(1f, null),
+          Row(2f, new Date(1451948400000L)),
+          Row(3f, new Date(1460066400500L))
+        ))
+        val df = spark.createDataFrame(rdd, schema)
+        df.write.format("avro").save(dir.toString)
+        assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+        checkAnswer(
+          spark.read.format("avro").load(dir.toString).select("date"),
+          Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
+      }
     }
   }
 
@@ -1521,6 +1528,95 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       assert(deprecatedEvents.size === 1)
     }
   }
+
+  test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
+    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+      checkAnswer(
+        readResourceAvroFile("before_1582_date_v2_4.avro"),
+        Row(java.sql.Date.valueOf("1001-01-01")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+    }
+  }
+
+  test("SPARK-31183: rebasing microseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val nonRebased = "1001-01-07 01:09:05.123456"
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq(tsStr).toDF("tsS")
+          .select($"tsS".cast("timestamp").as("ts"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing milliseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val rebased = "1001-01-01 01:02:03.123"
+    val nonRebased = "1001-01-07 01:09:05.123"
+    Seq(
+      """{"type": "long","logicalType": "timestamp-millis"}""",
+      """"long"""").foreach { tsType =>
+      val timestampSchema = s"""
+        |{
+        |  "namespace": "logical",
+        |  "type": "record",
+        |  "name": "test",
+        |  "fields": [
+        |    {"name": "ts", "type": $tsType}
+        |  ]
+        |}""".stripMargin
+      withTempPath { dir =>
+        val path = dir.getAbsolutePath
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+          Seq(tsStr).toDF("tsS")
+            .select($"tsS".cast("timestamp").as("ts"))
+            .write
+            .option("avroSchema", timestampSchema)
+            .format("avro")
+            .save(path)
+
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(rebased)))
+        }
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(nonRebased)))
+        }
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing dates in write") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq("1001-01-01").toDF("dateS")
+          .select($"dateS".cast("date").as("date"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
+      }
+    }
+  }
 }
 
 class AvroV1Suite extends AvroSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org