You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shyla deshpande <de...@gmail.com> on 2017/02/07 18:28:16 UTC

Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

I have a situation similar to the following and I get SPARK-13758
<https://issues.apache.org/jira/browse/SPARK-13758>.


I understand why I get this error, but I want to know what should be
the approach in dealing with these situations.


Thanks


>     var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>     val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>     words.foreachRDD((rdd: RDD[String]) => {
>       val res = rdd.map(word => (word, word.length)).collect()
>       println("words: " + res.mkString(", "))
>       cached = cached.union(rdd)
>       cached.checkpoint()
>       println("cached words: " + cached.collect.mkString(", "))
>     })

Re: Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
You can create lazily instantiated singleton instances. See
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
for examples of accumulators and broadcast variables. You can use the same
approach to create your cached RDD.

On Tue, Feb 7, 2017 at 10:45 AM, shyla deshpande <de...@gmail.com>
wrote:

> and my cached RDD is not small. If it was maybe I could materialize and
> broadcast.
>
> Thanks
>
> On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> I have a situation similar to the following and I get SPARK-13758 <https://issues.apache.org/jira/browse/SPARK-13758>.
>>
>>
>> I understand why I get this error, but I want to know what should be the approach in dealing with these situations.
>>
>>
>> Thanks
>>
>>
>> >     var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>> >     val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>> >     words.foreachRDD((rdd: RDD[String]) => {
>> >       val res = rdd.map(word => (word, word.length)).collect()
>> >       println("words: " + res.mkString(", "))
>> >       cached = cached.union(rdd)
>> >       cached.checkpoint()
>> >       println("cached words: " + cached.collect.mkString(", "))
>> >     })
>>
>>
>

Re: Spark streaming question - SPARK-13758 Need to use an external RDD inside DStream processing...Please help

Posted by shyla deshpande <de...@gmail.com>.
and my cached RDD is not small. If it was maybe I could materialize and
broadcast.

Thanks

On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande <de...@gmail.com>
wrote:

> I have a situation similar to the following and I get SPARK-13758 <https://issues.apache.org/jira/browse/SPARK-13758>.
>
>
> I understand why I get this error, but I want to know what should be the approach in dealing with these situations.
>
>
> Thanks
>
>
> >     var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> >     val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> >     words.foreachRDD((rdd: RDD[String]) => {
> >       val res = rdd.map(word => (word, word.length)).collect()
> >       println("words: " + res.mkString(", "))
> >       cached = cached.union(rdd)
> >       cached.checkpoint()
> >       println("cached words: " + cached.collect.mkString(", "))
> >     })
>
>