You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/29 05:21:47 UTC
[spark] branch branch-3.0 updated: [SPARK-31855][SQL][TESTS] Check
reading date/timestamp from Avro files w/ and w/o Spark version
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new d7e6e08 [SPARK-31855][SQL][TESTS] Check reading date/timestamp from Avro files w/ and w/o Spark version
d7e6e08 is described below
commit d7e6e08f34a8a63027cb1651f8d225454f494fc7
Author: Max Gekk <ma...@gmail.com>
AuthorDate: Fri May 29 05:18:37 2020 +0000
[SPARK-31855][SQL][TESTS] Check reading date/timestamp from Avro files w/ and w/o Spark version
### What changes were proposed in this pull request?
1. Add the following parquet files to the resource folder `external/avro/src/test/resources`:
- Files saved by Spark 2.4.5 (https://github.com/apache/spark/commit/cee4ecbb16917fa85f02c635925e2687400aa56b) without meta info `org.apache.spark.version`
- `before_1582_date_v2_4_5.avro` with a date column: `avro.schema {"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"int","logicalType":"date"},"null"]}]}`
- `before_1582_timestamp_millis_v2_4_5.avro` with a timestamp column: `avro.schema {"type":"record","name":"test","namespace":"logical","fields":[{"name":"dt","type":["null",{"type":"long","logicalType":"timestamp-millis"}],"default":null}]}`
- `before_1582_timestamp_micros_v2_4_5.avro` with a timestamp column: `avro.schema {"type":"record","name":"topLevelRecord","fields":[{"name":"dt","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]}]}`
- Files saved by Spark 2.4.6-rc3 (https://github.com/apache/spark/commit/570848da7c48ba0cb827ada997e51677ff672a39) with the meta info `org.apache.spark.version 2.4.6`:
- `before_1582_date_v2_4_6.avro` is similar to `before_1582_date_v2_4_5.avro` except Spark version in parquet meta info.
- `before_1582_timestamp_micros_v2_4_6.avro` is similar to `before_1582_timestamp_micros_v2_4_5.avro` except meta info.
- `before_1582_timestamp_millis_v2_4_6.avro` is similar to `before_1582_timestamp_millis_v2_4_5.avro` except meta info.
2. Removed a few avro files becaused they are replaced by Avro files generated by Spark 2.4.5 above.
3. Add new test "generate test files for checking compatibility with Spark 2.4" to `AvroSuite` (marked as ignored). The parquet files above were generated by this test.
4. Modified "SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps" in `AvroSuite` to use new parquet files.
### Why are the changes needed?
To improve test coverage.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By `AvroV1Suite` and `AvroV2Suite`.
Closes #28664 from MaxGekk/avro-update-resource-files.
Authored-by: Max Gekk <ma...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 37a1fb8d089190929b77421fbb449bf0ed1ab25e)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../src/test/resources/before_1582_date_v2_4.avro | Bin 202 -> 0 bytes
.../test/resources/before_1582_date_v2_4_5.avro | Bin 0 -> 200 bytes
.../test/resources/before_1582_date_v2_4_6.avro | Bin 0 -> 231 bytes
.../before_1582_timestamp_micros_v2_4_5.avro | Bin 0 -> 218 bytes
.../before_1582_timestamp_micros_v2_4_6.avro | Bin 0 -> 249 bytes
...ro => before_1582_timestamp_millis_v2_4_5.avro} | Bin 244 -> 244 bytes
.../before_1582_timestamp_millis_v2_4_6.avro | Bin 0 -> 275 bytes
.../test/resources/before_1582_ts_micros_v2_4.avro | Bin 218 -> 0 bytes
.../org/apache/spark/sql/avro/AvroSuite.scala | 117 +++++++++++++++++----
9 files changed, 98 insertions(+), 19 deletions(-)
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4.avro b/external/avro/src/test/resources/before_1582_date_v2_4.avro
deleted file mode 100644
index 96aa7cb..0000000
Binary files a/external/avro/src/test/resources/before_1582_date_v2_4.avro and /dev/null differ
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4_5.avro b/external/avro/src/test/resources/before_1582_date_v2_4_5.avro
new file mode 100644
index 0000000..5c15601
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_date_v2_4_5.avro differ
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4_6.avro b/external/avro/src/test/resources/before_1582_date_v2_4_6.avro
new file mode 100644
index 0000000..212ea1d
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_date_v2_4_6.avro differ
diff --git a/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_5.avro b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_5.avro
new file mode 100644
index 0000000..c3445e3
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_5.avro differ
diff --git a/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_6.avro b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_6.avro
new file mode 100644
index 0000000..96008d2
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_timestamp_micros_v2_4_6.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_5.avro
similarity index 52%
rename from external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro
rename to external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_5.avro
index dbaec81..be12a07 100644
Binary files a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro and b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_5.avro differ
diff --git a/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_6.avro b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_6.avro
new file mode 100644
index 0000000..262f5dd
Binary files /dev/null and b/external/avro/src/test/resources/before_1582_timestamp_millis_v2_4_6.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro
deleted file mode 100644
index efe5e71..0000000
Binary files a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro and /dev/null differ
diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index a5c1fb1..e2ae489 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.avro
import java.io._
import java.net.URL
-import java.nio.file.{Files, Paths}
+import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.{Date, Timestamp}
import java.util.{Locale, UUID}
@@ -38,7 +38,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, UTC}
+import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -1529,23 +1529,82 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}
}
+ // It generates input files for the test below:
+ // "SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps"
+ ignore("SPARK-31855: generate test files for checking compatibility with Spark 2.4") {
+ val resourceDir = "external/avro/src/test/resources"
+ val version = "2_4_6"
+ def save(
+ in: Seq[String],
+ t: String,
+ dstFile: String,
+ options: Map[String, String] = Map.empty): Unit = {
+ withTempDir { dir =>
+ in.toDF("dt")
+ .select($"dt".cast(t))
+ .repartition(1)
+ .write
+ .mode("overwrite")
+ .options(options)
+ .format("avro")
+ .save(dir.getCanonicalPath)
+ Files.copy(
+ dir.listFiles().filter(_.getName.endsWith(".avro")).head.toPath,
+ Paths.get(resourceDir, dstFile),
+ StandardCopyOption.REPLACE_EXISTING)
+ }
+ }
+ withDefaultTimeZone(LA) {
+ withSQLConf(
+ SQLConf.SESSION_LOCAL_TIMEZONE.key -> LA.getId) {
+ save(
+ Seq("1001-01-01"),
+ "date",
+ s"before_1582_date_v$version.avro")
+ save(
+ Seq("1001-01-01 01:02:03.123"),
+ "timestamp",
+ s"before_1582_timestamp_millis_v$version.avro",
+ // scalastyle:off line.size.limit
+ Map("avroSchema" ->
+ s"""
+ | {
+ | "namespace": "logical",
+ | "type": "record",
+ | "name": "test",
+ | "fields": [
+ | {"name": "dt", "type": ["null", {"type": "long","logicalType": "timestamp-millis"}], "default": null}
+ | ]
+ | }
+ |""".stripMargin))
+ // scalastyle:on line.size.limit
+ save(
+ Seq("1001-01-01 01:02:03.123456"),
+ "timestamp",
+ s"before_1582_timestamp_micros_v$version.avro")
+ }
+ }
+ }
+
test("SPARK-31183: compatibility with Spark 2.4 in reading dates/timestamps") {
// test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together.
- def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = {
+ def checkReadMixedFiles(
+ fileName: String,
+ dt: String,
+ dataStr: String,
+ checkDefaultLegacyRead: String => Unit): Unit = {
withTempPaths(2) { paths =>
paths.foreach(_.delete())
val path2_4 = getResourceAvroFilePath(fileName)
val path3_0 = paths(0).getCanonicalPath
val path3_0_rebase = paths(1).getCanonicalPath
if (dt == "date") {
- val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date"))
+ val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("dt"))
// By default we should fail to write ancient datetime values.
- var e = intercept[SparkException](df.write.format("avro").save(path3_0))
+ val e = intercept[SparkException](df.write.format("avro").save(path3_0))
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
- // By default we should fail to read ancient datetime values.
- e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
- assert(e.getCause.isInstanceOf[SparkUpgradeException])
+ checkDefaultLegacyRead(path2_4)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
df.write.format("avro").mode("overwrite").save(path3_0)
@@ -1562,25 +1621,23 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr))))
}
} else {
- val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts"))
+ val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("dt"))
val avroSchema =
s"""
|{
| "type" : "record",
| "name" : "test_schema",
| "fields" : [
- | {"name": "ts", "type": {"type": "long", "logicalType": "$dt"}}
+ | {"name": "dt", "type": {"type": "long", "logicalType": "$dt"}}
| ]
|}""".stripMargin
// By default we should fail to write ancient datetime values.
- var e = intercept[SparkException] {
+ val e = intercept[SparkException] {
df.write.format("avro").option("avroSchema", avroSchema).save(path3_0)
}
assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException])
- // By default we should fail to read ancient datetime values.
- e = intercept[SparkException](spark.read.format("avro").load(path2_4).collect())
- assert(e.getCause.isInstanceOf[SparkUpgradeException])
+ checkDefaultLegacyRead(path2_4)
withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) {
df.write.format("avro").option("avroSchema", avroSchema).mode("overwrite").save(path3_0)
@@ -1600,11 +1657,33 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession {
}
}
- checkReadMixedFiles("before_1582_date_v2_4.avro", "date", "1001-01-01")
- checkReadMixedFiles(
- "before_1582_ts_micros_v2_4.avro", "timestamp-micros", "1001-01-01 01:02:03.123456")
- checkReadMixedFiles(
- "before_1582_ts_millis_v2_4.avro", "timestamp-millis", "1001-01-01 01:02:03.124")
+ def failInRead(path: String): Unit = {
+ val e = intercept[SparkException](spark.read.format("avro").load(path).collect())
+ assert(e.getCause.isInstanceOf[SparkUpgradeException])
+ }
+ def successInRead(path: String): Unit = spark.read.format("avro").load(path).collect()
+ Seq(
+ // By default we should fail to read ancient datetime values when parquet files don't
+ // contain Spark version.
+ "2_4_5" -> failInRead _,
+ "2_4_6" -> successInRead _
+ ).foreach { case (version, checkDefaultRead) =>
+ checkReadMixedFiles(
+ s"before_1582_date_v$version.avro",
+ "date",
+ "1001-01-01",
+ checkDefaultRead)
+ checkReadMixedFiles(
+ s"before_1582_timestamp_micros_v$version.avro",
+ "timestamp-micros",
+ "1001-01-01 01:02:03.123456",
+ checkDefaultRead)
+ checkReadMixedFiles(
+ s"before_1582_timestamp_millis_v$version.avro",
+ "timestamp-millis",
+ "1001-01-01 01:02:03.123",
+ checkDefaultRead)
+ }
}
test("SPARK-31183: rebasing microseconds timestamps in write") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org