You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Willi Raschkowski (Jira)" <ji...@apache.org> on 2021/08/13 11:44:00 UTC

[jira] [Comment Edited] (SPARK-35324) Spark SQL configs not respected in RDDs

    [ https://issues.apache.org/jira/browse/SPARK-35324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17398607#comment-17398607 ] 

Willi Raschkowski edited comment on SPARK-35324 at 8/13/21, 11:43 AM:
----------------------------------------------------------------------

Here's an example of that:

{code:title=Rebase error}
Lost task 20.0 in stage 1.0 (TID 1, redacted, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$1(Executor.scala:474)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:477)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInWrite(DataSourceUtils.scala:143)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$creteDateRebaseFuncInWrite$1(DataSourceUtils.scala:163)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$4(ParquetWriteSupport.scala:169)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$4$adapted(ParquetWriteSupport.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:463)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:451)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:140)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:278)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more
{code}

where we're already setting {{...rebaseModeInWrite: legacy}}.

I appreciate that we can use SQL instead of RDDs for most use cases but in some cases we're still forced to go down to RDDs and since Spark 3 we're effectively blocking those cases from reading / writing old datetimes.


was (Author: raschkowski):
Here's an example of that:

{code:title=Rebase error}
Lost task 20.0 in stage 1.0 (TID 1, redacted, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$1(Executor.scala:474)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:477)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum interoperability. Or set spark.sql.legacy.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is, if you are 100% sure that the written files will only be read by Spark 3.0+ or other systems that use Proleptic Gregorian calendar.
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInWrite(DataSourceUtils.scala:143)
	at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$creteDateRebaseFuncInWrite$1(DataSourceUtils.scala:163)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$4(ParquetWriteSupport.scala:169)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$4$adapted(ParquetWriteSupport.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:463)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:146)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:451)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:136)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:182)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:44)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:40)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.write(FileFormatDataWriter.scala:140)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:278)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
	... 9 more
{code}

I appreciate that we can use SQL instead of RDDs for most use cases but in some cases we're still forced to go down to RDDs and since Spark 3 we're effectively blocking those cases from reading / writing old datetimes.

> Spark SQL configs not respected in RDDs
> ---------------------------------------
>
>                 Key: SPARK-35324
>                 URL: https://issues.apache.org/jira/browse/SPARK-35324
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, SQL
>    Affects Versions: 3.0.2, 3.1.1
>            Reporter: Willi Raschkowski
>            Priority: Major
>         Attachments: Screen Shot 2021-05-06 at 00.33.10.png, Screen Shot 2021-05-06 at 00.35.10.png
>
>
> When reading a CSV file, {{spark.sql.timeParserPolicy}} is respected in actions on the resulting dataframe. But it's ignored in actions on dataframe's RDD.
> E.g. say to parse dates in a CSV you need {{spark.sql.timeParserPolicy}} to be set to {{LEGACY}}. If you set the config, {{df.collect}} will work as you'd expect. However, {{df.collect.rdd}} will fail because it'll ignore the override and read the config value as {{EXCEPTION}}.
> For instance:
> {code:java|title=test.csv}
> date
> 2/6/18
> {code}
> {code:java}
> scala> spark.conf.set("spark.sql.legacy.timeParserPolicy", "legacy")
> scala> val df = {
>      |   spark.read
>      |     .option("header", "true")
>      |     .option("dateFormat", "MM/dd/yy")
>      |     .schema("date date")
>      |     .csv("/Users/wraschkowski/Downloads/test.csv")
>      | }
> df: org.apache.spark.sql.DataFrame = [date: date]
> scala> df.show
> +----------+                                                                    
> |      date|
> +----------+
> |2018-02-06|
> +----------+
> scala> df.count
> res3: Long = 1
> scala> df.rdd.count
> 21/05/06 00:06:18 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
> org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '2/6/18' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
> 	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:150)
> 	at org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper$$anonfun$checkParsedDiff$1.applyOrElse(DateTimeFormatterHelper.scala:141)
> 	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
> 	at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.$anonfun$parse$1(DateFormatter.scala:61)
> 	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
> 	at scala.Option.getOrElse(Option.scala:189)
> 	at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.parse(DateFormatter.scala:58)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$21(UnivocityParser.scala:202)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:238)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$20(UnivocityParser.scala:200)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:291)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:254)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:396)
> 	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:60)
> 	at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:400)
> 	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
> 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173)
> 	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
> 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> 	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
> 	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1866)
> 	at org.apache.spark.rdd.RDD.$anonfun$count$1(RDD.scala:1253)
> 	at org.apache.spark.rdd.RDD.$anonfun$count$1$adapted(RDD.scala:1253)
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:131)
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> 	at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.time.format.DateTimeParseException: Text '2/6/18' could not be parsed at index 0
> 	at java.base/java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:2046)
> 	at java.base/java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1874)
> 	at org.apache.spark.sql.catalyst.util.Iso8601DateFormatter.$anonfun$parse$1(DateFormatter.scala:59)
> 	... 32 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org