You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by shyla deshpande <de...@gmail.com> on 2017/02/07 18:28:16 UTC
Spark streaming question - SPARK-13758 Need to use an external RDD
inside DStream processing...Please help
I have a situation similar to the following and I get SPARK-13758
<https://issues.apache.org/jira/browse/SPARK-13758>.
I understand why I get this error, but I want to know what should be
the approach in dealing with these situations.
Thanks
> var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> words.foreachRDD((rdd: RDD[String]) => {
> val res = rdd.map(word => (word, word.length)).collect()
> println("words: " + res.mkString(", "))
> cached = cached.union(rdd)
> cached.checkpoint()
> println("cached words: " + cached.collect.mkString(", "))
> })
Re: Spark streaming question - SPARK-13758 Need to use an external
RDD inside DStream processing...Please help
Posted by "Shixiong(Ryan) Zhu" <sh...@databricks.com>.
You can create lazily instantiated singleton instances. See
http://spark.apache.org/docs/latest/streaming-programming-guide.html#accumulators-broadcast-variables-and-checkpoints
for examples of accumulators and broadcast variables. You can use the same
approach to create your cached RDD.
On Tue, Feb 7, 2017 at 10:45 AM, shyla deshpande <de...@gmail.com>
wrote:
> and my cached RDD is not small. If it was maybe I could materialize and
> broadcast.
>
> Thanks
>
> On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande <deshpandeshyla@gmail.com
> > wrote:
>
>> I have a situation similar to the following and I get SPARK-13758 <https://issues.apache.org/jira/browse/SPARK-13758>.
>>
>>
>> I understand why I get this error, but I want to know what should be the approach in dealing with these situations.
>>
>>
>> Thanks
>>
>>
>> > var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>> > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>> > words.foreachRDD((rdd: RDD[String]) => {
>> > val res = rdd.map(word => (word, word.length)).collect()
>> > println("words: " + res.mkString(", "))
>> > cached = cached.union(rdd)
>> > cached.checkpoint()
>> > println("cached words: " + cached.collect.mkString(", "))
>> > })
>>
>>
>
Re: Spark streaming question - SPARK-13758 Need to use an external
RDD inside DStream processing...Please help
Posted by shyla deshpande <de...@gmail.com>.
and my cached RDD is not small. If it was maybe I could materialize and
broadcast.
Thanks
On Tue, Feb 7, 2017 at 10:28 AM, shyla deshpande <de...@gmail.com>
wrote:
> I have a situation similar to the following and I get SPARK-13758 <https://issues.apache.org/jira/browse/SPARK-13758>.
>
>
> I understand why I get this error, but I want to know what should be the approach in dealing with these situations.
>
>
> Thanks
>
>
> > var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
> > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
> > words.foreachRDD((rdd: RDD[String]) => {
> > val res = rdd.map(word => (word, word.length)).collect()
> > println("words: " + res.mkString(", "))
> > cached = cached.union(rdd)
> > cached.checkpoint()
> > println("cached words: " + cached.collect.mkString(", "))
> > })
>
>