You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Symeon Meichanetzoglou <si...@gmail.com> on 2019/06/06 14:09:43 UTC

Multi-dimensional aggregations in Structured Streaming

Hi all,

We are facing a challenge where a simple use case seems not trivial to
implement in structured streaming: an aggregation should be calculated
and then some other aggregations should further aggregate on the first
aggregation. Something like:
1st aggregation: val df = dfIn.groupBy(a,b,c,d).agg(sum(size))
2nd aggregation: df.groupBy(a,b,c).agg(sum(size))
3rd aggregation: df.groupBy(a,b,d).agg(sum(size))

My initial idea was to do it as below but it is wrong because you end
up overwriting rows in each mini batch since the state is not kept:

// df is the output of the first aggregation
df.writeStream.outputMode(OutputMode.Update).foreachBatch{ (batchDf:
DataFrame, batchId: Long) =>
  write(batchDf)
  // furtherAggregates contains the 2nd and 3rd aggregation
  furtherAggregates.foreach(agg => write(agg.compute(batchDf)))
}

It doesn't seem feasible to achieve the goal using foreachBatch. I see
two possible solutions:
1. Run 3 independent queries in parallel, all reading the same input.
This is very inefficient as the first aggregation contains some
expensive processing (e.g. lookup on a broadcast variable and update
of a column).
2. Calculate the first aggregation and write it to Kafka. Then run the
2 further aggregations independently, reading the Kafka topic that was
written by the first aggregation. The problem with this is that the
first aggregation writes in update mode (we cannot use append because
we cannot wait for the watermark to expire) so in Kafka we will end up
with many updates for a single aggregated row. Is this a
responsibility of the later aggregations to resolve? Maybe by keeping
only the record with the latest timestamp in the mini-batch (for a set
of aggregated columns)?

To me solution 2 sounds as the way to go if the issue that I mentioned
can be resolved. What do you think?

Thanks!

Symeon

PS: I found a similar question here:
https://stackoverflow.com/questions/41011002/multiple-aggregations-in-spark-structured-streaming

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org