You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by wangsan <wa...@163.com> on 2017/11/10 09:52:37 UTC
Generate windows on processing time in Spark Structured Streaming
Hi all,
How can I use current processing time to generate windows in streaming processing?
window function's Scala doc says "For a streaming query, you may use the function current_timestamp to generate windows on processing time.” But when using current_timestamp as column in window function, exceptions occurred.
Here are my code:
val socketDF = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
socketDF.createOrReplaceTempView("words")
val windowedCounts = spark.sql(
"""
|SELECT value as word, current_timestamp() as time, count(1) as count FROM words
| GROUP BY window(time, "5 seconds"), word
""".stripMargin)
windowedCounts
.writeStream
.outputMode("complete")
.format("console")
.start()
.awaitTermination()
And here are Exception Info:
Caused by: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
Re: Generate windows on processing time in Spark Structured Streaming
Posted by Michael Armbrust <mi...@databricks.com>.
Hmmm, we should allow that. current_timestamp() is acutally deterministic
within any given batch. Could you open a JIRA ticket?
On Fri, Nov 10, 2017 at 1:52 AM, wangsan <wa...@163.com> wrote:
> Hi all,
>
> How can I use current processing time to generate windows in streaming
> processing?
> *window* function's Scala doc says "For a streaming query, you may use
> the function current_timestamp to generate windows on processing time.”
> But when using current_timestamp as column in window function, exceptions
> occurred.
>
> Here are my code:
>
> val socketDF = spark.readStream
> .format("socket")
> .option("host", "localhost")
> .option("port", 9999)
> .load()
>
> socketDF.createOrReplaceTempView("words")
> val windowedCounts = spark.sql(
> """
> |SELECT value as word, current_timestamp() as time, count(1) as count FROM words
> | GROUP BY window(time, "5 seconds"), word
> """.stripMargin)
>
> windowedCounts
> .writeStream
> .outputMode("complete")
> .format("console")
> .start()
> .awaitTermination()
>
> And here are Exception Info:
> *Caused by: org.apache.spark.sql.AnalysisException: nondeterministic
> expressions are only allowed in*
> *Project, Filter, Aggregate or Window, *found:
>
>
>
>