You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by YaoPau <jo...@gmail.com> on 2014/11/20 02:13:21 UTC

Joining DStream with static file

Here is my attempt:

val sparkConf = new SparkConf().setAppName("LogCounter")
val ssc =  new StreamingContext(sparkConf, Seconds(2))

val sc = new SparkContext()
val geoData = sc.textFile("data/geoRegion.csv")
            .map(_.split(','))
            .map(line => (line(0), (line(1),line(2),line(3),line(4))))

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
topicMap).map(_._2)

val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details
removed for brevity
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))

This is very wrong.  I have a feeling I should be broadcasting geoData
instead of reading it in with each task (it's a 100MB file), but I'm not
sure where to put the code that maps from the .csv to the final geoData rdd.

Also I'm not sure if geoData is even defined correctly (maybe it should use
ssc instead of sc?).  Please advise.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Joining DStream with static file

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
1. You don't have to create another sparkContext. you can simply call the
*ssc.sparkContext*

2. May be after the transformation on geoData, you could do a persist so
next time, it will be read from memory.

Thanks
Best Regards

On Thu, Nov 20, 2014 at 6:43 AM, YaoPau <jo...@gmail.com> wrote:

> Here is my attempt:
>
> val sparkConf = new SparkConf().setAppName("LogCounter")
> val ssc =  new StreamingContext(sparkConf, Seconds(2))
>
> val sc = new SparkContext()
> val geoData = sc.textFile("data/geoRegion.csv")
>             .map(_.split(','))
>             .map(line => (line(0), (line(1),line(2),line(3),line(4))))
>
> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
>
> val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details
> removed for brevity
> val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))
>
> This is very wrong.  I have a feeling I should be broadcasting geoData
> instead of reading it in with each task (it's a 100MB file), but I'm not
> sure where to put the code that maps from the .csv to the final geoData
> rdd.
>
> Also I'm not sure if geoData is even defined correctly (maybe it should use
> ssc instead of sc?).  Please advise.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>