You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Filip <Fi...@enghouse.com.INVALID> on 2021/01/21 15:36:29 UTC

Spark structured streaming - efficient way to do lots of aggregations on the same input files

Hi,

I'm considering using Apache Spark for the development of an application.
This would replace a legacy program which reads CSV files and does lots
(tens/hundreds) of aggregations on them. The aggregations are fairly simple:
counts, sums, etc. while applying some filtering conditions on some of the
columns.

I prefer using structured streaming for its simplicity and low-latency. I'd
also like to use full SQL queries (via createOrReplaceTempView). However,
doing multiple queries means Spark will re-read the input files for each one
of them. This seems very inefficient for my use-case.

Does anyone have any suggestions? The only thing I found so far involves
using forEachBatch and manually updating my aggregates. But, I think there
should be a simpler solution for this use case.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

Posted by Filip <Fi...@enghouse.com.INVALID>.
Hi,

I don't have any code for the forEachBatch approach, I mentioned it due to
this response to my question on SO:
https://stackoverflow.com/a/65803718/1017130

I have added some very simple code below that I think shows what I'm trying
to do:
val schema = StructType(
    Array(
	StructField("senderId1", LongType),
	StructField("senderId2", LongType),
	StructField("destId1", LongType),
    StructField("eventType", IntegerType)
    StructField("cost", LongType)
    )
)

val fileStreamDf = spark.readStream.schema(schema).option("delimiter",
"\t").csv("D:\\SparkTest")

fileStreamDf.createOrReplaceTempView("myTable")

spark.sql("SELECT senderId1, count(*) AS num_events FROM myTable GROUP BY
senderId1 HAVING count(*) >
10000").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT senderId2, sum(cost) AS total_cost FROM myTable WHERE
eventType = 3 GROUP BY senderId2 HAVING sum(cost) >
500").writeStream.format("console").outputMode("complete").start()
spark.sql("SELECT destId1, count(*) AS num_events WHERE event_type = 5 GROUP
BY destId1 HAVING count(*) >
1000").writeStream.format("console").outputMode("complete").start()

Of course, this is simplified; there are a lot more columns and the queries
should also group by time period, but I didn't want to complicate it.
With this example, I have 3 queries running on the same input files, but
Spark would need to read the files from disk 3 times. These extra reads are
what I'm trying to avoid.
In the real application, the number of queries would be a lot higher and
dynamic (they are generated in response to some configurations made by the
end users).



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Spark structured streaming - efficient way to do lots of aggregations on the same input files

Posted by Jacek Laskowski <ja...@japila.pl>.
Hi Filip,

Care to share the code behind "The only thing I found so far involves using
forEachBatch and manually updating my aggregates. "?

I'm not completely sure I understand your use case and hope the code could
shed more light on it. Thank you.

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


On Thu, Jan 21, 2021 at 5:05 PM Filip <Fi...@enghouse.com.invalid>
wrote:

> Hi,
>
> I'm considering using Apache Spark for the development of an application.
> This would replace a legacy program which reads CSV files and does lots
> (tens/hundreds) of aggregations on them. The aggregations are fairly
> simple:
> counts, sums, etc. while applying some filtering conditions on some of the
> columns.
>
> I prefer using structured streaming for its simplicity and low-latency. I'd
> also like to use full SQL queries (via createOrReplaceTempView). However,
> doing multiple queries means Spark will re-read the input files for each
> one
> of them. This seems very inefficient for my use-case.
>
> Does anyone have any suggestions? The only thing I found so far involves
> using forEachBatch and manually updating my aggregates. But, I think there
> should be a simpler solution for this use case.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>