You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Yuanjian Li (Jira)" <ji...@apache.org> on 2020/07/28 08:41:00 UTC

[jira] [Updated] (SPARK-32456) Give better error message for Distinct related operations in append mode without watermark

     [ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yuanjian Li updated SPARK-32456:
--------------------------------
    Summary: Give better error message for Distinct related operations in append mode without watermark  (was: Give better error message for union streams in append mode that don't have a watermark)

> Give better error message for Distinct related operations in append mode without watermark
> ------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32456
>                 URL: https://issues.apache.org/jira/browse/SPARK-32456
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Yuanjian Li
>            Priority: Major
>
> Check the following example:
>  
> {code:java}
> val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1")
> val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2")
> val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2")
> unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB})
> {code}
>  
> We'll get the following confusing exception:
> {code:java}
> java.util.NoSuchElementException: None.get
> 	at scala.None$.get(Option.scala:529)
> 	at scala.None$.get(Option.scala:527)
> 	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346)
> 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> 	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
> 	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112)
> ...
> {code}
> The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org