You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/10/16 01:35:51 UTC

[spark] branch branch-3.0 updated: [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new d0f1120  [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files
d0f1120 is described below

commit d0f1120f3fb524a52df71e03c3d28ac82f76c1a3
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri Oct 16 10:28:15 2020 +0900

    [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files
    
    ### What changes were proposed in this pull request?
    Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that the metadata key 'org.apache.spark.legacyDateTime' is written correctly depending on the SQL configs:
    - spark.sql.legacy.avro.datetimeRebaseModeInWrite
    - spark.sql.legacy.parquet.datetimeRebaseModeInWrite
    
    This is a follow up https://github.com/apache/spark/pull/28137.
    
    ### Why are the changes needed?
    1. To improve test coverage
    2. To make sure that the metadata key is actually saved to Avro/Parquet files
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    By running the added tests:
    ```
    $ build/sbt "testOnly org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
    $ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
    $ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
    ```
    
    Closes #30061 from MaxGekk/parquet-test-metakey.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 38c05af1d5538fc6ad00cdb57c1a90e90d04e25d)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 40 ++++++++++++++---
 .../datasources/parquet/ParquetIOSuite.scala       | 51 +++++++++++++++++-----
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d2f49ae..5d7d2e4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1788,15 +1788,19 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
     }
   }
 
+  private def checkMetaData(path: java.io.File, key: String, expectedValue: String): Unit = {
+    val avroFiles = path.listFiles()
+      .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
+    assert(avroFiles.length === 1)
+    val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]())
+    val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
+    assert(value === expectedValue)
+  }
+
   test("SPARK-31327: Write Spark version into Avro file metadata") {
     withTempPath { path =>
       spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
-      val avroFiles = path.listFiles()
-        .filter(f => f.isFile && !f.getName.startsWith(".") && !f.getName.startsWith("_"))
-      assert(avroFiles.length === 1)
-      val reader = DataFileReader.openReader(avroFiles(0), new GenericDatumReader[GenericRecord]())
-      val version = reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
-      assert(version === SPARK_VERSION_SHORT)
+      checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
     }
   }
 
@@ -1809,6 +1813,30 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
       spark.read.format("avro").options(conf).load(path)
     }
   }
+
+  test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") {
+    def saveTs(dir: java.io.File): Unit = {
+      Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+        .repartition(1)
+        .write
+        .format("avro")
+        .save(dir.getAbsolutePath)
+    }
+    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
+      withTempPath { dir =>
+        saveTs(dir)
+        checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
+      }
+    }
+    Seq(CORRECTED, EXCEPTION).foreach { mode =>
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> mode.toString) {
+        withTempPath { dir =>
+          saveTs(dir)
+          checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
+        }
+      }
+    }
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2dc8a06..ff406f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -859,20 +859,24 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
     }
   }
 
-  test("Write Spark version into Parquet metadata") {
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      spark.range(1).repartition(1).write.parquet(path)
-      val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
-
-      val conf = new Configuration()
-      val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
-      val parquetReadOptions = HadoopReadOptions.builder(conf).build()
-      val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
-      val metaData = m.getFileMetaData.getKeyValueMetaData
+  private def getMetaData(dir: java.io.File): Map[String, String] = {
+    val file = SpecificParquetRecordReaderBase.listDirectory(dir).get(0)
+    val conf = new Configuration()
+    val hadoopInputFile = HadoopInputFile.fromPath(new Path(file), conf)
+    val parquetReadOptions = HadoopReadOptions.builder(conf).build()
+    val m = ParquetFileReader.open(hadoopInputFile, parquetReadOptions)
+    val metadata = try {
+      m.getFileMetaData.getKeyValueMetaData
+    } finally {
       m.close()
+    }
+    metadata.asScala.toMap
+  }
 
-      assert(metaData.get(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
+  test("Write Spark version into Parquet metadata") {
+    withTempPath { dir =>
+      spark.range(1).repartition(1).write.parquet(dir.getAbsolutePath)
+      assert(getMetaData(dir)(SPARK_VERSION_METADATA_KEY) === SPARK_VERSION_SHORT)
     }
   }
 
@@ -1109,6 +1113,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession
       }
     }
   }
+
+  test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") {
+    def saveTs(dir: java.io.File): Unit = {
+      Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+        .repartition(1)
+        .write
+        .parquet(dir.getAbsolutePath)
+    }
+    withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) {
+      withTempPath { dir =>
+        saveTs(dir)
+        assert(getMetaData(dir)(SPARK_LEGACY_DATETIME) === "")
+      }
+    }
+    Seq(CORRECTED, EXCEPTION).foreach { mode =>
+      withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> mode.toString) {
+        withTempPath { dir =>
+          saveTs(dir)
+          assert(getMetaData(dir).get(SPARK_LEGACY_DATETIME).isEmpty)
+        }
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org