You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Richard Reitmeyer <Ri...@symantec.com.INVALID> on 2019/10/15 21:36:06 UTC

apache-spark Structured Stateful Streaming with window / SPARK-21641

What’s the right way use Structured Streaming with both state and windows?

Looking at the slides from https://www.slideshare.net/databricks/arbitrary-stateful-aggregations-using-structured-streaming-in-apache-spark  slides 26 and 31, it looks like stateful processing events for every device every minute should be

events
  .withWatermark(“event_time”, “2 minutes”)
  .groupBy(“device_id”, window(“event_time”, “1 minute”))
  .flatMapWithState(…)(…)
  …

But with Apache Spark 2.4.4 this won’t work and it looks like https://issues.apache.org/jira/browse/SPARK-21641 is to blame.

What’s the recommended way to handle this?



Re: apache-spark Structured Stateful Streaming with window / SPARK-21641

Posted by Jungtaek Lim <ka...@gmail.com>.
First of all, I guess you've asked for using both "arbitrary stateful
operation" and "native support of windowing".
(Even you don't deal with state directly, whenever you use stateful
operations like streaming aggregation or stream-stream join, you use state.)

In short, there's no native support of windowing when you use
flatMapGroupsWithState - actually the meaning of "arbitrary" is "low level"
- you may need to deal with window logic by yourself. Please refer below
example to see how you can deal with window in flatMapGroupsWithState
(processing time session window):
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

There're two types of APIs in Spark Dataset - "typed" and "untyped". Most
of features are available in untyped API (as you've referred), but not
exhaustive like (flat)MapGroupsWithState which leverage typed API. "window"
function is not supported in typed API so they cannot be used altogether. I
guess the restriction was introduced for simplicity and performance, but
not 100% sure.

You can still leverage "window" function to populate rows with window
bound, and map them via typed API, and apply groupByKey. Please take a look
at below query: you'll get more rows than the number of input rows here
since it applies sliding window.

val inputData = MemoryStream[(Long, Long)]
>
> inputData.toDF()
>   .selectExpr("_1", "CAST(_2 / 1000 AS TIMESTAMP) AS timestamp")
>   .select(col("*"), window(col("timestamp"), "10 seconds", "5
> seconds").as("window"))
>   .select(col("_1"), col("window.start").as("window_start"),
> col("window.end").as("window_end"))
>   .as[(Long, Timestamp, Timestamp)]
>

Hope it helps.

Thanks,
Jungtaek Lim (HeartSaVioR)




On Wed, Oct 16, 2019 at 6:36 AM Richard Reitmeyer
<Ri...@symantec.com.invalid> wrote:

> What’s the right way use Structured Streaming with both state and windows?
>
>
>
> Looking at the slides from
> https://www.slideshare.net/databricks/arbitrary-stateful-aggregations-using-structured-streaming-in-apache-spark
>   slides 26 and 31, it looks like stateful processing events for every
> device every minute should be
>
>
>
> events
>
>   .withWatermark(“event_time”, “2 minutes”)
>
>   .groupBy(“device_id”, window(“event_time”, “1 minute”))
>
>   .flatMapWithState(…)(…)
>
>   …
>
>
>
> But with Apache Spark 2.4.4 this won’t work and it looks like
> https://issues.apache.org/jira/browse/SPARK-21641 is to blame.
>
>
>
> What’s the recommended way to handle this?
>
>
>
>
>