You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Spico Florin <sp...@gmail.com> on 2019/10/31 09:16:12 UTC
[Spark Streaming] Apply multiple ML pipelines(Models) to the same stream
Hello!
I have an use case where I have to apply multiple already trained models
(e.g. M1, M2, ..Mn) on the same spark stream ( fetched from kafka).
The models were trained usining the isolation forest algorithm from here:
https://github.com/titicaca/spark-iforest
I have found something similar with my case here
https://www.youtube.com/watch?v=EhRHQPCdldI, but unfortunately I don't know
if the company Genesys (former AltoCloud) made this API (StreamPipeline,
Heterogenous Pipeline ) open source.
I handled this with the above schema code, but I don't know how optimal is.
//read the stream
val kafkaStreamDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", "topic")
.load
val myModels = Array("m1", "m2","m3","m4")
//parallize the input models in order to have multiple threads handling the
same stream, otherwise blocked??
myModels.par.foreach(lm => {
//load the model
val model = PipelineModel.load(lm)
kafkaStreamDF.writeStream.foreachBatch({ (batchDF: DataFrame,
batchId: Long) =>
//apply model
val pdf =
model.transform(batchDF).selectExpr("CAST(to_json(struct(*)) AS STRING) AS
value").write
.format("json")
.save("anom/" + lm + System.currentTimeMillis())
}).start().awaitTermination()
})
Questions:
1. Therefore, I would like to know if there is any any Spark API for
handling such an use case?
2. If yes, where can I find it?
3. If no, how can I optimally implement this?
Any idea, suggestions is highly appreciated.
Thanks.
Florin