You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (Jira)" <ji...@apache.org> on 2020/08/18 07:03:00 UTC

[jira] [Commented] (SPARK-32456) Check the Distinct by assuming it as Aggregate for Structured Streaming

    [ https://issues.apache.org/jira/browse/SPARK-32456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17179410#comment-17179410 ] 

Apache Spark commented on SPARK-32456:
--------------------------------------

User 'HeartSaVioR' has created a pull request for this issue:
https://github.com/apache/spark/pull/29461

> Check the Distinct by assuming it as Aggregate for Structured Streaming
> -----------------------------------------------------------------------
>
>                 Key: SPARK-32456
>                 URL: https://issues.apache.org/jira/browse/SPARK-32456
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.0.0
>            Reporter: Yuanjian Li
>            Assignee: Yuanjian Li
>            Priority: Major
>             Fix For: 3.0.1, 3.1.0
>
>
> We want to fix 2 things here:
> 1. Give better error message for Distinct related operations in append mode that doesn't have a watermark
> Check the following example: 
> {code:java}
> val s1 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s1")
> val s2 = spark.readStream.format("rate").option("rowsPerSecond", 1).load().createOrReplaceTempView("s2")
> val unionRes = spark.sql("select s1.value , s1.timestamp from s1 union select s2.value, s2.timestamp from s2")
> unionResult.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code}
> We'll get the following confusing exception:
> {code:java}
> java.util.NoSuchElementException: None.get
> 	at scala.None$.get(Option.scala:529)
> 	at scala.None$.get(Option.scala:527)
> 	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346)
> 	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> 	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
> 	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112)
> ...
> {code}
> The union clause in SQL has the requirement of deduplication, the parser will generate {{Distinct(Union)}} and the optimizer rule {{ReplaceDistinctWithAggregate}} will change it to {{Aggregate(Union)}}. So the root cause here is the checking logic for Aggregate is missing for Distinct.
> Actually it happens for all Distinct related operations in Structured Streaming, e.g
> {code:java}
> val df = spark.readStream.format("rate").load()
> df.createOrReplaceTempView("deduptest")
> val distinct = spark.sql("select distinct value from deduptest")
> distinct.writeStream.option("checkpointLocation", ${pathA}).start(${pathB}){code}
>  
> 2. Make {{Distinct}} in complete mode runnable.
> The distinct in complete mode will throw the exception:
> {quote} 
> {{Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org