You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/12 14:07:13 UTC
[spark] branch branch-3.0 updated: [SPARK-31680][SQL][TESTS]
Support Java 8 datetime types by Random data generator
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new cbe75bb [SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator
cbe75bb is described below
commit cbe75bb8879ec408088eaf6944284a893bb63c92
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Tue May 12 14:05:31 2020 +0000
[SPARK-31680][SQL][TESTS] Support Java 8 datetime types by Random data generator
### What changes were proposed in this pull request?
Generates java.time.Instant/java.time.LocalDate for DateType/TimestampType by `RandomDataGenerator.forType` when the SQL config `spark.sql.datetime.java8API.enabled` is set to `true`.
### Why are the changes needed?
To improve test coverage, and check java.time.Instant/java.time.LocalDate types in round trip tests.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By running modified test suites `RowEncoderSuite`, `RandomDataGeneratorSuite` and `HadoopFsRelationTest`.
Closes #28502 from MaxGekk/random-java8-datetime.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit a3fafddf390fd180047a0b9ef46f052a9b6813e0)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/RandomDataGenerator.scala | 105 +++++++++++++--------
.../spark/sql/RandomDataGeneratorSuite.scala | 32 ++++---
.../sql/catalyst/encoders/RowEncoderSuite.scala | 36 +++----
.../spark/sql/sources/HadoopFsRelationTest.scala | 75 ++++++++-------
4 files changed, 146 insertions(+), 102 deletions(-)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index cf8d772..6a5bdc4 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -19,13 +19,15 @@ package org.apache.spark.sql
import java.math.MathContext
import java.sql.{Date, Timestamp}
+import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import scala.collection.mutable
import scala.util.{Random, Try}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
-import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
+import org.apache.spark.sql.catalyst.util.DateTimeConstants.{MICROS_PER_MILLIS, MILLIS_PER_DAY}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval
/**
@@ -162,7 +164,7 @@ object RandomDataGenerator {
})
case BooleanType => Some(() => rand.nextBoolean())
case DateType =>
- def uniformDateRand(rand: Random): java.sql.Date = {
+ def uniformDaysRand(rand: Random): Int = {
var milliseconds = rand.nextLong() % 253402329599999L
// -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
// for "0001-01-01 00:00:00.000000". We need to find a
@@ -172,27 +174,37 @@ object RandomDataGenerator {
// January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
milliseconds = rand.nextLong() % 253402329599999L
}
- val date = DateTimeUtils.toJavaDate((milliseconds / MILLIS_PER_DAY).toInt)
- // The generated `date` is based on the hybrid calendar Julian + Gregorian since
- // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
- // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `date` to
- // a local date in Proleptic Gregorian calendar to satisfy this requirement.
- // Some years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
- // As the consequence of that, 29 February of such years might not exist in Proleptic
- // Gregorian calendar. When this happens, we shift the date by one day.
- Try { date.toLocalDate; date }.getOrElse(new Date(date.getTime + MILLIS_PER_DAY))
+ (milliseconds / MILLIS_PER_DAY).toInt
+ }
+ val specialDates = Seq(
+ "0001-01-01", // the fist day of Common Era
+ "1582-10-15", // the cutover date from Julian to Gregorian calendar
+ "1970-01-01", // the epoch date
+ "9999-12-31" // the last supported date according to SQL standard
+ )
+ if (SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED)) {
+ randomNumeric[LocalDate](
+ rand,
+ (rand: Random) => LocalDate.ofEpochDay(uniformDaysRand(rand)),
+ specialDates.map(LocalDate.parse))
+ } else {
+ randomNumeric[java.sql.Date](
+ rand,
+ (rand: Random) => {
+ val date = DateTimeUtils.toJavaDate(uniformDaysRand(rand))
+ // The generated `date` is based on the hybrid calendar Julian + Gregorian since
+ // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
+ // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `date` to
+ // a local date in Proleptic Gregorian calendar to satisfy this requirement. Some
+ // years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
+ // As the consequence of that, 29 February of such years might not exist in Proleptic
+ // Gregorian calendar. When this happens, we shift the date by one day.
+ Try { date.toLocalDate; date }.getOrElse(new Date(date.getTime + MILLIS_PER_DAY))
+ },
+ specialDates.map(java.sql.Date.valueOf))
}
- randomNumeric[java.sql.Date](
- rand,
- uniformDateRand,
- Seq(
- "0001-01-01", // the fist day of Common Era
- "1582-10-15", // the cutover date from Julian to Gregorian calendar
- "1970-01-01", // the epoch date
- "9999-12-31" // the last supported date according to SQL standard
- ).map(java.sql.Date.valueOf))
case TimestampType =>
- def uniformTimestampRand(rand: Random): java.sql.Timestamp = {
+ def uniformMicorsRand(rand: Random): Long = {
var milliseconds = rand.nextLong() % 253402329599999L
// -62135740800000L is the number of milliseconds before January 1, 1970, 00:00:00 GMT
// for "0001-01-01 00:00:00.000000". We need to find a
@@ -202,26 +214,39 @@ object RandomDataGenerator {
// January 1, 1970, 00:00:00 GMT for "9999-12-31 23:59:59.999999".
milliseconds = rand.nextLong() % 253402329599999L
}
- // DateTimeUtils.toJavaTimestamp takes microsecond.
- val ts = DateTimeUtils.toJavaTimestamp(milliseconds * 1000)
- // The generated `ts` is based on the hybrid calendar Julian + Gregorian since
- // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
- // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `ts` to
- // a local timestamp in Proleptic Gregorian calendar to satisfy this requirement.
- // Some years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
- // As the consequence of that, 29 February of such years might not exist in Proleptic
- // Gregorian calendar. When this happens, we shift the timestamp `ts` by one day.
- Try { ts.toLocalDateTime; ts }.getOrElse(new Timestamp(ts.getTime + MILLIS_PER_DAY))
+ milliseconds * MICROS_PER_MILLIS
+ }
+ val specialTs = Seq(
+ "0001-01-01 00:00:00", // the fist timestamp of Common Era
+ "1582-10-15 23:59:59", // the cutover date from Julian to Gregorian calendar
+ "1970-01-01 00:00:00", // the epoch timestamp
+ "9999-12-31 23:59:59" // the last supported timestamp according to SQL standard
+ )
+ if (SQLConf.get.getConf(SQLConf.DATETIME_JAVA8API_ENABLED)) {
+ randomNumeric[Instant](
+ rand,
+ (rand: Random) => DateTimeUtils.microsToInstant(uniformMicorsRand(rand)),
+ specialTs.map { s =>
+ val ldt = LocalDateTime.parse(s.replace(" ", "T"))
+ ldt.atZone(ZoneId.systemDefault()).toInstant
+ })
+ } else {
+ randomNumeric[java.sql.Timestamp](
+ rand,
+ (rand: Random) => {
+ // DateTimeUtils.toJavaTimestamp takes microsecond.
+ val ts = DateTimeUtils.toJavaTimestamp(uniformMicorsRand(rand))
+ // The generated `ts` is based on the hybrid calendar Julian + Gregorian since
+ // 1582-10-15 but it should be valid in Proleptic Gregorian calendar too which is used
+ // by Spark SQL since version 3.0 (see SPARK-26651). We try to convert `ts` to
+ // a local timestamp in Proleptic Gregorian calendar to satisfy this requirement. Some
+ // years are leap years in Julian calendar but not in Proleptic Gregorian calendar.
+ // As the consequence of that, 29 February of such years might not exist in Proleptic
+ // Gregorian calendar. When this happens, we shift the timestamp `ts` by one day.
+ Try { ts.toLocalDateTime; ts }.getOrElse(new Timestamp(ts.getTime + MILLIS_PER_DAY))
+ },
+ specialTs.map(java.sql.Timestamp.valueOf))
}
- randomNumeric[java.sql.Timestamp](
- rand,
- uniformTimestampRand,
- Seq(
- "0001-01-01 00:00:00", // the fist timestamp of Common Era
- "1582-10-15 23:59:59", // the cutover date from Julian to Gregorian calendar
- "1970-01-01 00:00:00", // the epoch timestamp
- "9999-12-31 23:59:59.999" // the last supported timestamp according to SQL standard
- ).map(java.sql.Timestamp.valueOf))
case CalendarIntervalType => Some(() => {
val months = rand.nextInt(1000)
val days = rand.nextInt(10000)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
index 3e62ca0..cb335e5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
@@ -24,30 +24,36 @@ import scala.util.Random
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.CatalystTypeConverters
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
* Tests of [[RandomDataGenerator]].
*/
-class RandomDataGeneratorSuite extends SparkFunSuite {
+class RandomDataGeneratorSuite extends SparkFunSuite with SQLHelper {
/**
* Tests random data generation for the given type by using it to generate random values then
* converting those values into their Catalyst equivalents using CatalystTypeConverters.
*/
def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = {
- val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
- val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse {
- fail(s"Random data generator was not defined for $dataType")
- }
- if (nullable) {
- assert(Iterator.fill(100)(generator()).contains(null))
- } else {
- assert(!Iterator.fill(100)(generator()).contains(null))
- }
- for (_ <- 1 to 10) {
- val generatedValue = generator()
- toCatalyst(generatedValue)
+ Seq(false, true).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
+ val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse {
+ fail(s"Random data generator was not defined for $dataType")
+ }
+ if (nullable) {
+ assert(Iterator.fill(100)(generator()).contains(null))
+ } else {
+ assert(!Iterator.fill(100)(generator()).contains(null))
+ }
+ for (_ <- 1 to 10) {
+ val generatedValue = generator()
+ toCatalyst(generatedValue)
+ }
+ }
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index c1158e0..fd24f05 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -377,23 +377,27 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
private def encodeDecodeTest(schema: StructType): Unit = {
test(s"encode/decode: ${schema.simpleString}") {
- val encoder = RowEncoder(schema).resolveAndBind()
- val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get
-
- var input: Row = null
- try {
- for (_ <- 1 to 5) {
- input = inputGenerator.apply().asInstanceOf[Row]
- val convertedBack = roundTrip(encoder, input)
- assert(input == convertedBack)
+ Seq(false, true).foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ val encoder = RowEncoder(schema).resolveAndBind()
+ val inputGenerator = RandomDataGenerator.forType(schema, nullable = false).get
+
+ var input: Row = null
+ try {
+ for (_ <- 1 to 5) {
+ input = inputGenerator.apply().asInstanceOf[Row]
+ val convertedBack = roundTrip(encoder, input)
+ assert(input == convertedBack)
+ }
+ } catch {
+ case e: Exception =>
+ fail(
+ s"""
+ |schema: ${schema.simpleString}
+ |input: ${input}
+ """.stripMargin, e)
+ }
}
- } catch {
- case e: Exception =>
- fail(
- s"""
- |schema: ${schema.simpleString}
- |input: ${input}
- """.stripMargin, e)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 9cf1719..42b6862 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -145,40 +145,49 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
val seed = System.nanoTime()
withClue(s"Random data generated with the seed: ${seed}") {
- val dataGenerator = RandomDataGenerator.forType(
- dataType = dataType,
- nullable = true,
- new Random(seed)
- ).getOrElse {
- fail(s"Failed to create data generator for schema $dataType")
+ val java8ApiConfValues = if (dataType == DateType || dataType == TimestampType) {
+ Seq(false, true)
+ } else {
+ Seq(false)
+ }
+ java8ApiConfValues.foreach { java8Api =>
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) {
+ val dataGenerator = RandomDataGenerator.forType(
+ dataType = dataType,
+ nullable = true,
+ new Random(seed)
+ ).getOrElse {
+ fail(s"Failed to create data generator for schema $dataType")
+ }
+
+ // Create a DF for the schema with random data. The index field is used to sort the
+ // DataFrame. This is a workaround for SPARK-10591.
+ val schema = new StructType()
+ .add("index", IntegerType, nullable = false)
+ .add("col", dataType, nullable = true)
+ val rdd =
+ spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator())))
+ val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
+
+ df.write
+ .mode("overwrite")
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .options(extraOptions)
+ .save(path)
+
+ val loadedDF = spark
+ .read
+ .format(dataSourceName)
+ .option("dataSchema", df.schema.json)
+ .schema(df.schema)
+ .options(extraOptions)
+ .load(path)
+ .orderBy("index")
+
+ checkAnswer(loadedDF, df)
+ }
}
-
- // Create a DF for the schema with random data. The index field is used to sort the
- // DataFrame. This is a workaround for SPARK-10591.
- val schema = new StructType()
- .add("index", IntegerType, nullable = false)
- .add("col", dataType, nullable = true)
- val rdd =
- spark.sparkContext.parallelize((1 to 20).map(i => Row(i, dataGenerator())))
- val df = spark.createDataFrame(rdd, schema).orderBy("index").coalesce(1)
-
- df.write
- .mode("overwrite")
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .options(extraOptions)
- .save(path)
-
- val loadedDF = spark
- .read
- .format(dataSourceName)
- .option("dataSchema", df.schema.json)
- .schema(df.schema)
- .options(extraOptions)
- .load(path)
- .orderBy("index")
-
- checkAnswer(loadedDF, df)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org