You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by wangsan <wa...@163.com> on 2017/11/10 09:52:37 UTC

Generate windows on processing time in Spark Structured Streaming

Hi all,


How can I use current processing time to generate windows in streaming processing? 
window function's Scala doc says "For a streaming query, you may use the function current_timestamp to generate windows on processing time.”  But when using current_timestamp as column in window function, exceptions occurred.


Here are my code:
val socketDF = spark.readStream
  .format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

socketDF.createOrReplaceTempView("words")
val windowedCounts = spark.sql(
"""
    |SELECT value as word, current_timestamp() as time, count(1) as count FROM words
    |   GROUP BY window(time, "5 seconds"), word
  """.stripMargin)

windowedCounts
  .writeStream
  .outputMode("complete")
.format("console")
.start()
.awaitTermination()
And here are Exception Info:
Caused by: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:






Re: Generate windows on processing time in Spark Structured Streaming

Posted by Michael Armbrust <mi...@databricks.com>.
Hmmm, we should allow that.  current_timestamp() is acutally deterministic
within any given batch.  Could you open a JIRA ticket?

On Fri, Nov 10, 2017 at 1:52 AM, wangsan <wa...@163.com> wrote:

> Hi all,
>
> How can I use current processing time to generate windows in streaming
> processing?
> *window* function's Scala doc says "For a streaming query, you may use
> the function current_timestamp to generate windows on processing time.”
>  But when using current_timestamp as column in window function, exceptions
> occurred.
>
> Here are my code:
>
> val socketDF = spark.readStream
>   .format("socket")
>   .option("host", "localhost")
>   .option("port", 9999)
>   .load()
>
> socketDF.createOrReplaceTempView("words")
> val windowedCounts = spark.sql(
>   """
>     |SELECT value as word, current_timestamp() as time, count(1) as count FROM words
>     |   GROUP BY window(time, "5 seconds"), word
>   """.stripMargin)
>
> windowedCounts
>   .writeStream
>   .outputMode("complete")
>   .format("console")
>   .start()
>   .awaitTermination()
>
> And here are Exception Info:
> *Caused by: org.apache.spark.sql.AnalysisException: nondeterministic
> expressions are only allowed in*
> *Project, Filter, Aggregate or Window, *found:
>
>
>
>