You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Li Zuwei <li...@gmail.com> on 2017/10/09 04:44:57 UTC

[Spark Structured Streaming] How to select events by latest timestamp and aggregate count

I would like to perform structured streaming aggregation with a windowing period. Given this following data schema. The objective is to filter by the latest occurring event based on user. Then aggregate the count of each event type for each location.

time    location   user   type
 1        A         1      one
 2        A         1      two
 1        B         2      one
 2        B         2      one
 1        A         3      two
 1        A         4      one
Sample output:

location   countOne   countTwo
    A          1         2
    B          1         0
something like the following:

val aggTypes = df
  .select($"location", $"time", $"user", $"type")
  .groupBy($"user")
  .agg(max($"timestamp") as 'timestamp)
  .select("*")
  .withWatermark("timestamp", conf.kafka.watermark.toString + " seconds")
  .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds", $"location")
  .agg(count(when($"type" === "one", $"type")) as 'countOne, count(when($"type" === "two", $"type" as 'countTwo)))
  .drop($"window")
As structured streaming does not support multiple aggregations and Non-time-based windows are not supported on streaming DataFrames/Datasets. I am not sure if it is possible to achieve the desired output in 1 streaming query.

Any help is appreciated.