You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gaspar Muñoz (Jira)" <ji...@apache.org> on 2022/01/13 13:13:00 UTC

[jira] [Updated] (SPARK-37898) Error reading old dates when AQE is enabled in Spark 3.1. Works when AQE is disabled

     [ https://issues.apache.org/jira/browse/SPARK-37898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Gaspar Muñoz  updated SPARK-37898:
----------------------------------
    Description: 
Hi guys, 
 
I was testing an spark job that fail when I encountered something that is not consistent among different spark versions.  I reduced my code to be replicated easily with a simple spark-shell. Note: Code logic probably does not make sense :)
 
The following snippet:
 
 - Works with Spark 3.1.2 and 3.1.3-rc  when AQE disabled
 - Fails with Spark 3.1.2 and 3.1.3-rc  when AQE enabled
 - Works with Spark 3.2.0 always
{code:java}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window


spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY")
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")

val dataset = spark.read.parquet("/tmp/parquet-output")

val window = Window.orderBy(dataset.col("date").desc)
val resultDataset = dataset.withColumn("rankedFilterOverPartition", dense_rank().over(window)).filter("rankedFilterOverPartition = 1").drop("rankedFilterOverPartition")
println(resultDataset.rdd.getNumPartitions){code}
 
Previously I wrote data with this snippet and Spark 2.2 to write data in the path /tmp/parquet-output.
 
{code:java}
import spark.implicits._
import java.sql.Timestamp
import org.apache.spark.sql.functions._

case class MyCustomClass(id_col: Int, date: String, timestamp_col: java.sql.Timestamp)

val dataset = Seq(MyCustomClass(1, "0001-01-01", Timestamp.valueOf("1000-01-01 10:00:00")), MyCustomClass(2, "0001-01-01", Timestamp.valueOf("1000-01-01 10:00:00"))).toDF
 
dataset.select($"id_col", $"date".cast("date"), $"timestamp_col").write.mode("overwrite").parquet("/tmp/parquet-output"){code}
 
The error is:
{code:java}
scala> println(resultDataset.rdd.getNumPartitions) 22/01/13 13:45:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/13 13:45:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/13 13:45:17 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. Or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is.
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:147)
at org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala){code}
 
 
Regards

  was:
Hi guys, 
 
I was testing an spark job that fail when I encountered something that is not consistent among different spark versions.  I reduced my code to be replicated easily with a simple spark-shell. 
 
The following snippet:
 
 - Works with Spark 3.1.2 and 3.1.3-rc  when AQE disabled
 - Fails with Spark 3.1.2 and 3.1.3-rc  when AQE enabled
 - Works with Spark 3.2.0 always


{code:java}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window


spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY")
spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")

val dataset = spark.read.parquet("/tmp/parquet-output")

val window = Window.orderBy(dataset.col("date").desc)
val resultDataset = dataset.withColumn("rankedFilterOverPartition", dense_rank().over(window)).filter("rankedFilterOverPartition = 1").drop("rankedFilterOverPartition")
println(resultDataset.rdd.getNumPartitions){code}


 
Previously I wrote data with this snippet and Spark 2.2 to write data in the path /tmp/parquet-output.
 
{code:java}
import spark.implicits._
import java.sql.Timestamp
import org.apache.spark.sql.functions._

case class MyCustomClass(id_col: Int, date: String, timestamp_col: java.sql.Timestamp)

val dataset = Seq(MyCustomClass(1, "0001-01-01", Timestamp.valueOf("1000-01-01 10:00:00")), MyCustomClass(2, "0001-01-01", Timestamp.valueOf("1000-01-01 10:00:00"))).toDF
 
dataset.select($"id_col", $"date".cast("date"), $"timestamp_col").write.mode("overwrite").parquet("/tmp/parquet-output"){code}
 
The error is:
{code:java}
scala> println(resultDataset.rdd.getNumPartitions) 22/01/13 13:45:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/13 13:45:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/01/13 13:45:17 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. Or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is.
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:147)
at org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala){code}
 
 
Regards


> Error reading old dates when AQE is enabled in Spark 3.1. Works when AQE is disabled
> ------------------------------------------------------------------------------------
>
>                 Key: SPARK-37898
>                 URL: https://issues.apache.org/jira/browse/SPARK-37898
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Gaspar Muñoz 
>            Priority: Major
>
> Hi guys, 
>  
> I was testing an spark job that fail when I encountered something that is not consistent among different spark versions.  I reduced my code to be replicated easily with a simple spark-shell. Note: Code logic probably does not make sense :)
>  
> The following snippet:
>  
>  - Works with Spark 3.1.2 and 3.1.3-rc  when AQE disabled
>  - Fails with Spark 3.1.2 and 3.1.3-rc  when AQE enabled
>  - Works with Spark 3.2.0 always
> {code:java}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.expressions.Window
> spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY")
> spark.conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY")
> val dataset = spark.read.parquet("/tmp/parquet-output")
> val window = Window.orderBy(dataset.col("date").desc)
> val resultDataset = dataset.withColumn("rankedFilterOverPartition", dense_rank().over(window)).filter("rankedFilterOverPartition = 1").drop("rankedFilterOverPartition")
> println(resultDataset.rdd.getNumPartitions){code}
>  
> Previously I wrote data with this snippet and Spark 2.2 to write data in the path /tmp/parquet-output.
>  
> {code:java}
> import spark.implicits._
> import java.sql.Timestamp
> import org.apache.spark.sql.functions._
> case class MyCustomClass(id_col: Int, date: String, timestamp_col: java.sql.Timestamp)
> val dataset = Seq(MyCustomClass(1, "0001-01-01", Timestamp.valueOf("1000-01-01 10:00:00")), MyCustomClass(2, "0001-01-01", Timestamp.valueOf("1000-01-01 10:00:00"))).toDF
>  
> dataset.select($"id_col", $"date".cast("date"), $"timestamp_col").write.mode("overwrite").parquet("/tmp/parquet-output"){code}
>  
> The error is:
> {code:java}
> scala> println(resultDataset.rdd.getNumPartitions) 22/01/13 13:45:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
> 22/01/13 13:45:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
> 22/01/13 13:45:17 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during reading. Or set spark.sql.legacy.parquet.datetimeRebaseModeInRead to 'CORRECTED' to read the datetime values as it is.
> at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:147)
> at org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala){code}
>  
>  
> Regards



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org