You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepansh Goyal <de...@gmail.com> on 2018/04/10 17:42:27 UTC

package reload in dapply SparkR

I have a native R model and doing structured streaming on it. Data comes
from Kafka and goes into dapply method where my model does prediction and
data is written to sink.

Problem:- My model requires caret package. Inside dapply function for every
stream job, caret package is loaded again which adds (~2s) delay.


kafka <- read.stream("kafka",subscribe = "source",
kafka.bootstrap.servers = "10.117.172.48:9092", topic = "source")
lines<- select(kafka, cast(kafka$value, "string"))
schema<-schema(lines)
library(caret)

df4<-dapply(lines,function(x){
  print(system.time(library(caret)))
  x
},schema)

q2 <- write.stream(df4,"kafka", checkpointLocation = loc, topic =
"sink", kafka.bootstrap.servers = "10.117.172.48:9092")
awaitTermination(q2)

For the above code, for every new stream my output is,
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package:
lattice
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Attaching package: ‘lattice’
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: The following object is masked
from ‘package:SparkR’:
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: histogram
18/03/23 11:08:10 INFO BufferedStreamThread:
18/03/23 11:08:10 INFO BufferedStreamThread: Loading required package:
ggplot2
18/03/23 11:08:12 INFO BufferedStreamThread: user system elapsed
18/03/23 11:08:12 INFO BufferedStreamThread: 1.937 0.062 1.999
18/03/23 11:08:12 INFO RRunner: Times: boot = 0.009 s, init = 0.017 s,
broadcast = 0.001 s, read-input = 0.001 s, compute = 2.064 s, write-output
= 0.001 s, total = 2.093 s

PFA: rest log file.

Ideally, the packages shouldn't be loaded again. I think the environment is
getting created and destroyed with each query. Is there some solution to
this? or Am I missing something here?


Thanks,

Deepansh