You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/29 05:21:47 UTC

[spark] branch branch-3.0 updated: [SPARK-31855][SQL][TESTS] Check reading date/timestamp from Avro files w/ and w/o Spark version

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d7e6e08  [SPARK-31855][SQL][TESTS] Check reading date/timestamp from Avro files w/ and w/o Spark version
d7e6e08 is described below

commit d7e6e08f34a8a63027cb1651f8d225454f494fc7
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri May 29 05:18:37 2020 +0000

    [SPARK-31855][SQL][TESTS] Check reading date/timestamp from Avro files w/ and w/o Spark version
    
    ### What changes were proposed in this pull request?
    1. Add the following parquet files to the resource folder `external/avro/src/test/resources`:
       - Files saved by Spark 2.4.5 (https://github.com/apache/spark/commit/cee4ecbb16917fa85f02c635925e2687400aa56b) without meta info `org.apache.spark.version`
          - `before_1582_date_v2_4_5.avro` with a date column: `avro.schema	{"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"int","logicalType":"date"},"null"]}]}`
          - `before_1582_timestamp_millis_v2_4_5.avro` with a timestamp column: `avro.schema	{"type":"record","name":"test","namespace":"logical","fields":[{"name":"dt","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}]}`
          - `before_1582_timestamp_micros_v2_4_5.avro` with a timestamp column: `avro.schema	{"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]}]}`
        - Files saved by Spark 2.4.6-rc3 (https://github.com/apache/spark/commit/570848da7c48ba0cb827ada997e51677ff672a39) with the meta info `org.apache.spark.version	2.4.6`:
          - `before_1582_date_v2_4_6.avro` is similar to `before_1582_date_v2_4_5.avro` except Spark version in parquet meta info.
          - `before_1582_timestamp_micros_v2_4_6.avro` is similar to `before_1582_timestamp_micros_v2_4_5.avro` except meta info.
          - `before_1582_timestamp_millis_v2_4_6.avro` is similar to `before_1582_timestamp_millis_v2_4_5.avro` except meta info.
    2. Removed a few avro files becaused they are replaced by Avro files generated by Spark 2.4.5 above.
    3. Add new test "generate test files for checking compatibility with Spark 2.4" to `AvroSuite` (marked as ignored). The parquet files above were generated by this test.
    4. Modified "SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps" in `AvroSuite` to use new parquet files.
    
    ### Why are the changes needed?
    To improve test coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    By `AvroV1Suite` and `AvroV2Suite`.
    
    Closes #28664 from MaxGekk/avro-update-resource-files.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 37a1fb8d089190929b77421fbb449bf0ed1ab25e)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/test/resources/before_1582_date_v2_4.avro  | Bin 202 -> 0 bytes
 .../test/resources/before_1582_date_v2_4_5.avro    | Bin 0 -> 200 bytes
 .../test/resources/before_1582_date_v2_4_6.avro    | Bin 0 -> 231 bytes
 .../before_1582_timestamp_micros_v2_4_5.avro       | Bin 0 -> 218 bytes
 .../before_1582_timestamp_micros_v2_4_6.avro       | Bin 0 -> 249 bytes
 ...ro => before_1582_timestamp_millis_v2_4_5.avro} | Bin 244 -> 244 bytes
 .../before_1582_timestamp_millis_v2_4_6.avro       | Bin 0 -> 275 bytes
 .../test/resources/before_1582_ts_micros_v2_4.avro | Bin 218 -> 0 bytes
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 117 +++++++++++++++++----
 9 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/external/avro/src/test/resources/before_1582_date_v2_4.avro b/external/avro/src/test/resources/before_1582_date_v2_4.avro
deleted file mode 100644
index 96aa7cb..0000000
Binary files a/external/avro/src/test/resources/before_1582_date_v2_4.avro and /dev/null differ
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4_5.avro b/external/avro/src/test/resources/before_1582_date_v2_4_5.avro
new file mode 100644
index 0000000..5c15601
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_date_v2_4_5.avro differ
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4_6.avro b/external/avro/src/test/resources/before_1582_date_v2_4_6.avro
new file mode 100644
index 0000000..212ea1d
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_date_v2_4_6.avro differ
diff --git a/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_5.avro b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_5.avro
new file mode 100644
index 0000000..c3445e3
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_5.avro differ
diff --git a/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_6.avro b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_6.avro
new file mode 100644
index 0000000..96008d2
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_6.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_5.avro
similarity index 52%
rename from external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro
rename to external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_5.avro
index dbaec81..be12a07 100644
Binary files a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro and b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_5.avro differ
diff --git a/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_6.avro b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_6.avro
new file mode 100644
index 0000000..262f5dd
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_6.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro
deleted file mode 100644
index efe5e71..0000000
Binary files a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro and /dev/null differ
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index a5c1fb1..e2ae489 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.avro
 
 import java.io._
 import java.net.URL
-import java.nio.file.{Files, Paths}
+import java.nio.file.{Files, Paths, StandardCopyOption}
 import java.sql.{Date, Timestamp}
 import java.util.{Locale, UUID}
 
@@ -38,7 +38,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.TestingUDT.IntervalData
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, UTC}
+import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -1529,23 +1529,82 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     }
   }
 
+  // It generates input files for the test below:
+  // "SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps"
+  ignore("SPARK-31855: generate test files for checking compatibility with Spark 2.4") {
+    val resourceDir = "external/avro/src/test/resources"
+    val version = "2_4_6"
+    def save(
+      in: Seq[String],
+      t: String,
+      dstFile: String,
+      options: Map[String, String] = Map.empty): Unit = {
+      withTempDir { dir =>
+        in.toDF("dt")
+          .select($"dt".cast(t))
+          .repartition(1)
+          .write
+          .mode("overwrite")
+          .options(options)
+          .format("avro")
+          .save(dir.getCanonicalPath)
+        Files.copy(
+          dir.listFiles().filter(_.getName.endsWith(".avro")).head.toPath,
+          Paths.get(resourceDir, dstFile),
+          StandardCopyOption.REPLACE_EXISTING)
+      }
+    }
+    withDefaultTimeZone(LA) {
+      withSQLConf(
+        SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) {
+        save(
+          Seq("1001-01-01"),
+          "date",
+          s"before_1582_date_v$version.avro")
+        save(
+          Seq("1001-01-01 01:02:03.123"),
+          "timestamp",
+          s"before_1582_timestamp_millis_v$version.avro",
+          // scalastyle:off line.size.limit
+          Map("avroSchema" ->
+            s"""
+               |  {
+               |    "namespace": "logical",
+               |    "type": "record",
+               |    "name": "test",
+               |    "fields": [
+               |      {"name": "dt", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null}
+               |    ]
+               |  }
+               |""".stripMargin))
+        // scalastyle:on line.size.limit
+        save(
+          Seq("1001-01-01 01:02:03.123456"),
+          "timestamp",
+          s"before_1582_timestamp_micros_v$version.avro")
+      }
+    }
+  }
+
   test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
     // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
-    def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
+    def checkReadMixedFiles(
+        fileName: String,
+        dt: String,
+        dataStr: String,
+        checkDefaultLegacyRead: String => Unit): Unit = {
       withTempPaths(2) { paths =>
         paths.foreach(_.delete())
         val path2_4 = getResourceAvroFilePath(fileName)
         val path3_0 = paths(0).getCanonicalPath
         val path3_0_rebase = paths(1).getCanonicalPath
         if (dt == "date") {
-          val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
+          val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("dt"))
 
           // By default we should fail to write ancient datetime values.
-          var e = intercept[SparkException](df.write.format("avro").save(path3_0))
+          val e = intercept[SparkException](df.write.format("avro").save(path3_0))
           assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
-          // By default we should fail to read ancient datetime values.
-          e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
-          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+          checkDefaultLegacyRead(path2_4)
 
           withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
             df.write.format("avro").mode("overwrite").save(path3_0)
@@ -1562,25 +1621,23 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
               1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
           }
         } else {
-          val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+          val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("dt"))
           val avroSchema =
             s"""
               |{
               |  "type" : "record",
               |  "name" : "test_schema",
               |  "fields" : [
-              |    {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
+              |    {"name": "dt", "type": {"type": "long", "logicalType": "$dt"}}
               |  ]
               |}""".stripMargin
 
           // By default we should fail to write ancient datetime values.
-          var e = intercept[SparkException] {
+          val e = intercept[SparkException] {
             df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
           }
           assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
-          // By default we should fail to read ancient datetime values.
-          e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
-          assert(e.getCause.isInstanceOf[SparkUpgradeException])
+          checkDefaultLegacyRead(path2_4)
 
           withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
             df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_0)
@@ -1600,11 +1657,33 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       }
     }
 
-    checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
-    checkReadMixedFiles(
-      "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
-    checkReadMixedFiles(
-      "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
+    def failInRead(path: String): Unit = {
+      val e = intercept[SparkException](spark.read.format("avro").load(path).collect())
+      assert(e.getCause.isInstanceOf[SparkUpgradeException])
+    }
+    def successInRead(path: String): Unit = spark.read.format("avro").load(path).collect()
+    Seq(
+      // By default we should fail to read ancient datetime values when parquet files don't
+      // contain Spark version.
+      "2_4_5" -> failInRead _,
+      "2_4_6" -> successInRead _
+    ).foreach { case (version, checkDefaultRead) =>
+      checkReadMixedFiles(
+        s"before_1582_date_v$version.avro",
+        "date",
+        "1001-01-01",
+        checkDefaultRead)
+      checkReadMixedFiles(
+        s"before_1582_timestamp_micros_v$version.avro",
+        "timestamp-micros",
+        "1001-01-01 01:02:03.123456",
+        checkDefaultRead)
+      checkReadMixedFiles(
+        s"before_1582_timestamp_millis_v$version.avro",
+        "timestamp-millis",
+        "1001-01-01 01:02:03.123",
+        checkDefaultRead)
+    }
   }
 
   test("SPARK-31183: rebasing microseconds timestamps in write") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org