You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/22 03:57:21 UTC
[spark] branch branch-3.0 updated: Revert
"[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check
the rebase flag out of function bodies"
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f0bfdc5 Revert "[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies"
f0bfdc5 is described below
commit f0bfdc513a15884de8f3ffc79cc1845991082642
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sat Mar 21 20:52:04 2020 -0700
Revert "[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the rebase flag out of function bodies"
This reverts commit a6f3e3b096e2d7a39e0b2fdec6452e6d633baf7e.
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 21 ++--
.../org/apache/spark/sql/avro/AvroSerializer.scala | 18 +--
.../spark/sql/avro/AvroLogicalTypeSuite.scala | 98 +++++++++++++++-
.../org/apache/spark/sql/avro/AvroSuite.scala | 124 +++------------------
4 files changed, 130 insertions(+), 131 deletions(-)
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 3e8a7f9..b98f303 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -106,22 +106,21 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
- case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, value) =>
- val millis = value.asInstanceOf[Long]
- val micros = DateTimeUtils.fromMillis(millis)
- val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
- updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
- updater.setLong(ordinal, micros)
- case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) =>
- val micros = value.asInstanceOf[Long]
- val rebasedMicros = DateTimeUtils.rebaseJulianToGregorianMicros(micros)
- updater.setLong(ordinal, rebasedMicros)
+ if (rebaseDateTime) {
+ updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+ } else {
+ updater.setLong(ordinal, micros)
+ }
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
- updater.setLong(ordinal, micros)
+ if (rebaseDateTime) {
+ updater.setLong(ordinal, DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+ } else {
+ updater.setLong(ordinal, micros)
+ }
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
}
diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 68df7c0..af9e3a5 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -149,15 +149,17 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable:
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
- case null | _: TimestampMillis if rebaseDateTime => (getter, ordinal) =>
- val micros = getter.getLong(ordinal)
- val rebasedMicros = DateTimeUtils.rebaseGregorianToJulianMicros(micros)
- DateTimeUtils.fromMillis(rebasedMicros)
case null | _: TimestampMillis => (getter, ordinal) =>
- DateTimeUtils.fromMillis(getter.getLong(ordinal))
- case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
- DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
- case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal)
+ val micros = getter.getLong(ordinal)
+ val rebasedMicros = if (rebaseDateTime) {
+ DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+ } else micros
+ DateTimeUtils.toMillis(rebasedMicros)
+ case _: TimestampMicros => (getter, ordinal) =>
+ val micros = getter.getLong(ordinal)
+ if (rebaseDateTime) {
+ DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+ } else micros
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type ${other}")
}
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 8256965..9e89b69 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.avro
import java.io.File
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}
+
+ private def readResourceAvroFile(name: String): DataFrame = {
+ val url = Thread.currentThread().getContextClassLoader.getResource(name)
+ spark.read.format("avro").load(url.toString)
+ }
+
+ test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ checkAnswer(
+ readResourceAvroFile("before_1582_date_v2_4.avro"),
+ Row(java.sql.Date.valueOf("1001-01-01")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ }
+ }
+
+ test("SPARK-31183: rebasing microseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val nonRebased = "1001-01-07 01:09:05.123456"
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing milliseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val rebased = "1001-01-01 01:02:03.123"
+ val nonRebased = "1001-01-07 01:09:05.123"
+ Seq(
+ """{"type": "long","logicalType": "timestamp-millis"}""",
+ """"long"""").foreach { tsType =>
+ val timestampSchema = s"""
+ |{
+ | "namespace": "logical",
+ | "type": "record",
+ | "name": "test",
+ | "fields": [
+ | {"name": "ts", "type": $tsType}
+ | ]
+ |}""".stripMargin
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write
+ .option("avroSchema", timestampSchema)
+ .format("avro")
+ .save(path)
+
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(rebased)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing dates in write") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq("1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
+ }
+ }
+ }
}
class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 34a0e2b..360160c 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net.URL
import java.nio.file.{Files, Paths}
import java.sql.{Date, Timestamp}
-import java.util.{Locale, UUID}
+import java.util.{Locale, TimeZone, UUID}
import scala.collection.JavaConverters._
@@ -35,10 +35,9 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
-import org.apache.spark.sql.TestingUDT.IntervalData
+import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -84,11 +83,6 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}
- private def readResourceAvroFile(name: String): DataFrame = {
- val url = Thread.currentThread().getContextClassLoader.getResource(name)
- spark.read.format("avro").load(url.toString)
- }
-
test("resolve avro data source") {
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is enabled.
@@ -408,19 +402,18 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
StructField("float", FloatType, true),
StructField("date", DateType, true)
))
- DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
- val rdd = spark.sparkContext.parallelize(Seq(
- Row(1f, null),
- Row(2f, new Date(1451948400000L)),
- Row(3f, new Date(1460066400500L))
- ))
- val df = spark.createDataFrame(rdd, schema)
- df.write.format("avro").save(dir.toString)
- assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
- checkAnswer(
- spark.read.format("avro").load(dir.toString).select("date"),
- Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
- }
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(1f, null),
+ Row(2f, new Date(1451948400000L)),
+ Row(3f, new Date(1460066400500L))
+ ))
+ val df = spark.createDataFrame(rdd, schema)
+ df.write.format("avro").save(dir.toString)
+ assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+ checkAnswer(
+ spark.read.format("avro").load(dir.toString).select("date"),
+ Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L))))
}
}
@@ -1528,95 +1521,6 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
assert(deprecatedEvents.size === 1)
}
}
-
- test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- checkAnswer(
- readResourceAvroFile("before_1582_date_v2_4.avro"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
- }
- }
-
- test("SPARK-31183: rebasing microseconds timestamps in write") {
- val tsStr = "1001-01-01 01:02:03.123456"
- val nonRebased = "1001-01-07 01:09:05.123456"
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write.format("avro")
- .save(path)
-
- checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(tsStr)))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(spark.read.format("avro").load(path), Row(Timestamp.valueOf(nonRebased)))
- }
- }
- }
-
- test("SPARK-31183: rebasing milliseconds timestamps in write") {
- val tsStr = "1001-01-01 01:02:03.123456"
- val rebased = "1001-01-01 01:02:03.123"
- val nonRebased = "1001-01-07 01:09:05.123"
- Seq(
- """{"type": "long","logicalType": "timestamp-millis"}""",
- """"long"""").foreach { tsType =>
- val timestampSchema = s"""
- |{
- | "namespace": "logical",
- | "type": "record",
- | "name": "test",
- | "fields": [
- | {"name": "ts", "type": $tsType}
- | ]
- |}""".stripMargin
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write
- .option("avroSchema", timestampSchema)
- .format("avro")
- .save(path)
-
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(rebased)))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(nonRebased)))
- }
- }
- }
- }
-
- test("SPARK-31183: rebasing dates in write") {
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq("1001-01-01").toDF("dateS")
- .select($"dateS".cast("date").as("date"))
- .write.format("avro")
- .save(path)
-
- checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-01")))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(spark.read.format("avro").load(path), Row(Date.valueOf("1001-01-07")))
- }
- }
- }
}
class AvroV1Suite extends AvroSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org