You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/12 14:07:13 UTC

[spark] branch branch-3.0 updated: [SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cbe75bb  [SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator
cbe75bb is described below

commit cbe75bb8879ec408088eaf6944284a893bb63c92
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue May 12 14:05:31 2020 +0000

    [SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator
    
    ### What changes were proposed in this pull request?
    Generates java.time.Instant/java.time.LocalDate for DateType/TimestampType by `RandomDataGenerator.forType` when the SQL config `spark.sql.datetime.java8API.enabled` is set to `true`.
    
    ### Why are the changes needed?
    To improve test coverage, and check java.time.Instant/java.time.LocalDate types in round trip tests.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    By running modified test suites `RowEncoderSuite`, `RandomDataGeneratorSuite` and `HadoopFsRelationTest`.
    
    Closes #28502 from MaxGekk/random-java8-datetime.
    
    Authored-by: Max Gekk <ma...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit a3fafddf390fd180047a0b9ef46f052a9b6813e0)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/RandomDataGenerator.scala | 105 +++++++++++++--------
 .../spark/sql/RandomDataGeneratorSuite.scala       |  32 ++++---
 .../sql/catalyst/encoders/RowEncoderSuite.scala    |  36 +++----
 .../spark/sql/sources/HadoopFsRelationTest.scala   |  75 ++++++++-------
 4 files changed, 146 insertions(+), 102 deletions(-)

diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index cf8d772..6a5bdc4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql
 
 import java.math.MathContext
 import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
 
 import scala.collection.mutable
 import scala.util.{Random, Try}
 
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_MILLIS, MILLIS_PER_DAY}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
 /**
@@ -162,7 +164,7 @@ object RandomDataGenerator {
       })
       case BooleanType => Some(() => rand.nextBoolean())
       case DateType =>
-        def uniformDateRand(rand: Random): java.sql.Date = {
+        def uniformDaysRand(rand: Random): Int = {
           var milliseconds = rand.nextLong() % 253402329599999L
           // -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
           // for "0001-01-01 00:00:00.000000". We need to find a
@@ -172,27 +174,37 @@ object RandomDataGenerator {
             // January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
             milliseconds = rand.nextLong() % 253402329599999L
           }
-          val date = DateTimeUtils.toJavaDate((milliseconds / MILLIS_PER_DAY).toInt)
-          // The generated `date` is based on the hybrid calendar Julian + Gregorian since
-          // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
-          // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `date` to
-          // a local date in Proleptic Gregorian calendar to satisfy this requirement.
-          // Some years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
-          // As the consequence of that, 29 February of such years might not exist in Proleptic
-          // Gregorian calendar. When this happens, we shift the date by one day.
-          Try { date.toLocalDate; date }.getOrElse(new Date(date.getTime + MILLIS_PER_DAY))
+          (milliseconds / MILLIS_PER_DAY).toInt
+        }
+        val specialDates = Seq(
+          "0001-01-01", // the fist day of Common Era
+          "1582-10-15", // the cutover date from Julian to Gregorian calendar
+          "1970-01-01", // the epoch date
+          "9999-12-31" // the last supported date according to SQL standard
+        )
+        if (SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED)) {
+          randomNumeric[LocalDate](
+            rand,
+            (rand: Random) => LocalDate.ofEpochDay(uniformDaysRand(rand)),
+            specialDates.map(LocalDate.parse))
+        } else {
+          randomNumeric[java.sql.Date](
+            rand,
+            (rand: Random) => {
+              val date = DateTimeUtils.toJavaDate(uniformDaysRand(rand))
+              // The generated `date` is based on the hybrid calendar Julian + Gregorian since
+              // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
+              // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `date` to
+              // a local date in Proleptic Gregorian calendar to satisfy this requirement. Some
+              // years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
+              // As the consequence of that, 29 February of such years might not exist in Proleptic
+              // Gregorian calendar. When this happens, we shift the date by one day.
+              Try { date.toLocalDate; date }.getOrElse(new Date(date.getTime + MILLIS_PER_DAY))
+            },
+            specialDates.map(java.sql.Date.valueOf))
         }
-        randomNumeric[java.sql.Date](
-          rand,
-          uniformDateRand,
-          Seq(
-            "0001-01-01", // the fist day of Common Era
-            "1582-10-15", // the cutover date from Julian to Gregorian calendar
-            "1970-01-01", // the epoch date
-            "9999-12-31"  // the last supported date according to SQL standard
-          ).map(java.sql.Date.valueOf))
       case TimestampType =>
-        def uniformTimestampRand(rand: Random): java.sql.Timestamp = {
+        def uniformMicorsRand(rand: Random): Long = {
           var milliseconds = rand.nextLong() % 253402329599999L
           // -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
           // for "0001-01-01 00:00:00.000000". We need to find a
@@ -202,26 +214,39 @@ object RandomDataGenerator {
             // January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
             milliseconds = rand.nextLong() % 253402329599999L
           }
-          // DateTimeUtils.toJavaTimestamp takes microsecond.
-          val ts = DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
-          // The generated `ts` is based on the hybrid calendar Julian + Gregorian since
-          // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
-          // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `ts` to
-          // a local timestamp in Proleptic Gregorian calendar to satisfy this requirement.
-          // Some years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
-          // As the consequence of that, 29 February of such years might not exist in Proleptic
-          // Gregorian calendar. When this happens, we shift the timestamp `ts` by one day.
-          Try { ts.toLocalDateTime; ts }.getOrElse(new Timestamp(ts.getTime + MILLIS_PER_DAY))
+          milliseconds * MICROS_PER_MILLIS
+        }
+        val specialTs = Seq(
+          "0001-01-01 00:00:00", // the fist timestamp of Common Era
+          "1582-10-15 23:59:59", // the cutover date from Julian to Gregorian calendar
+          "1970-01-01 00:00:00", // the epoch timestamp
+          "9999-12-31 23:59:59"  // the last supported timestamp according to SQL standard
+        )
+        if (SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED)) {
+          randomNumeric[Instant](
+            rand,
+            (rand: Random) => DateTimeUtils.microsToInstant(uniformMicorsRand(rand)),
+            specialTs.map { s =>
+              val ldt = LocalDateTime.parse(s.replace(" ", "T"))
+              ldt.atZone(ZoneId.systemDefault()).toInstant
+            })
+        } else {
+          randomNumeric[java.sql.Timestamp](
+            rand,
+            (rand: Random) => {
+              // DateTimeUtils.toJavaTimestamp takes microsecond.
+              val ts = DateTimeUtils.toJavaTimestamp(uniformMicorsRand(rand))
+              // The generated `ts` is based on the hybrid calendar Julian + Gregorian since
+              // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
+              // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `ts` to
+              // a local timestamp in Proleptic Gregorian calendar to satisfy this requirement. Some
+              // years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
+              // As the consequence of that, 29 February of such years might not exist in Proleptic
+              // Gregorian calendar. When this happens, we shift the timestamp `ts` by one day.
+              Try { ts.toLocalDateTime; ts }.getOrElse(new Timestamp(ts.getTime + MILLIS_PER_DAY))
+            },
+            specialTs.map(java.sql.Timestamp.valueOf))
         }
-        randomNumeric[java.sql.Timestamp](
-          rand,
-          uniformTimestampRand,
-          Seq(
-            "0001-01-01 00:00:00", // the fist timestamp of Common Era
-            "1582-10-15 23:59:59", // the cutover date from Julian to Gregorian calendar
-            "1970-01-01 00:00:00", // the epoch timestamp
-            "9999-12-31 23:59:59.999" // the last supported timestamp according to SQL standard
-          ).map(java.sql.Timestamp.valueOf))
       case CalendarIntervalType => Some(() => {
         val months = rand.nextInt(1000)
         val days = rand.nextInt(10000)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
index 3e62ca0..cb335e5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
@@ -24,30 +24,36 @@ import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
 /**
  * Tests of [[RandomDataGenerator]].
  */
-class RandomDataGeneratorSuite extends SparkFunSuite {
+class RandomDataGeneratorSuite extends SparkFunSuite with SQLHelper {
 
   /**
    * Tests random data generation for the given type by using it to generate random values then
    * converting those values into their Catalyst equivalents using CatalystTypeConverters.
    */
   def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = {
-    val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
-    val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse {
-      fail(s"Random data generator was not defined for $dataType")
-    }
-    if (nullable) {
-      assert(Iterator.fill(100)(generator()).contains(null))
-    } else {
-      assert(!Iterator.fill(100)(generator()).contains(null))
-    }
-    for (_ <- 1 to 10) {
-      val generatedValue = generator()
-      toCatalyst(generatedValue)
+    Seq(false, true).foreach { java8Api =>
+      withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+        val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
+        val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse {
+          fail(s"Random data generator was not defined for $dataType")
+        }
+        if (nullable) {
+          assert(Iterator.fill(100)(generator()).contains(null))
+        } else {
+          assert(!Iterator.fill(100)(generator()).contains(null))
+        }
+        for (_ <- 1 to 10) {
+          val generatedValue = generator()
+          toCatalyst(generatedValue)
+        }
+      }
     }
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index c1158e0..fd24f05 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -377,23 +377,27 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
 
   private def encodeDecodeTest(schema: StructType): Unit = {
     test(s"encode/decode: ${schema.simpleString}") {
-      val encoder = RowEncoder(schema).resolveAndBind()
-      val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get
-
-      var input: Row = null
-      try {
-        for (_ <- 1 to 5) {
-          input = inputGenerator.apply().asInstanceOf[Row]
-          val convertedBack = roundTrip(encoder, input)
-          assert(input == convertedBack)
+      Seq(false, true).foreach { java8Api =>
+        withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+          val encoder = RowEncoder(schema).resolveAndBind()
+          val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get
+
+          var input: Row = null
+          try {
+            for (_ <- 1 to 5) {
+              input = inputGenerator.apply().asInstanceOf[Row]
+              val convertedBack = roundTrip(encoder, input)
+              assert(input == convertedBack)
+            }
+          } catch {
+            case e: Exception =>
+              fail(
+                s"""
+                   |schema: ${schema.simpleString}
+                   |input: ${input}
+                 """.stripMargin, e)
+          }
         }
-      } catch {
-        case e: Exception =>
-          fail(
-            s"""
-               |schema: ${schema.simpleString}
-               |input: ${input}
-             """.stripMargin, e)
       }
     }
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 9cf1719..42b6862 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -145,40 +145,49 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
 
           val seed = System.nanoTime()
           withClue(s"Random data generated with the seed: ${seed}") {
-            val dataGenerator = RandomDataGenerator.forType(
-              dataType = dataType,
-              nullable = true,
-              new Random(seed)
-            ).getOrElse {
-              fail(s"Failed to create data generator for schema $dataType")
+            val java8ApiConfValues = if (dataType == DateType || dataType == TimestampType) {
+              Seq(false, true)
+            } else {
+              Seq(false)
+            }
+            java8ApiConfValues.foreach { java8Api =>
+              withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+                val dataGenerator = RandomDataGenerator.forType(
+                  dataType = dataType,
+                  nullable = true,
+                  new Random(seed)
+                ).getOrElse {
+                  fail(s"Failed to create data generator for schema $dataType")
+                }
+
+                // Create a DF for the schema with random data. The index field is used to sort the
+                // DataFrame.  This is a workaround for SPARK-10591.
+                val schema = new StructType()
+                  .add("index", IntegerType, nullable = false)
+                  .add("col", dataType, nullable = true)
+                val rdd =
+                  spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator())))
+                val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+                df.write
+                  .mode("overwrite")
+                  .format(dataSourceName)
+                  .option("dataSchema", df.schema.json)
+                  .options(extraOptions)
+                  .save(path)
+
+                val loadedDF = spark
+                  .read
+                  .format(dataSourceName)
+                  .option("dataSchema", df.schema.json)
+                  .schema(df.schema)
+                  .options(extraOptions)
+                  .load(path)
+                  .orderBy("index")
+
+                checkAnswer(loadedDF, df)
+              }
             }
-
-            // Create a DF for the schema with random data. The index field is used to sort the
-            // DataFrame.  This is a workaround for SPARK-10591.
-            val schema = new StructType()
-              .add("index", IntegerType, nullable = false)
-              .add("col", dataType, nullable = true)
-            val rdd =
-              spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator())))
-            val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
-            df.write
-              .mode("overwrite")
-              .format(dataSourceName)
-              .option("dataSchema", df.schema.json)
-              .options(extraOptions)
-              .save(path)
-
-            val loadedDF = spark
-              .read
-              .format(dataSourceName)
-              .option("dataSchema", df.schema.json)
-              .schema(df.schema)
-              .options(extraOptions)
-              .load(path)
-              .orderBy("index")
-
-            checkAnswer(loadedDF, df)
           }
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org