You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ashrowty <as...@gmail.com> on 2015/08/30 05:21:36 UTC

Spark shell and StackOverFlowError

I am running the Spark shell (1.2.1) in local mode and I have a simple
RDD[(String,String,Double)] with about 10,000 objects in it. I get a
StackOverFlowError each time I try to run the following code (the code
itself is just representative of other logic where I need to pass in a
variable). I tried broadcasting the variable too, but no luck .. missing
something basic here -

val rdd = sc.makeRDD(List(<Data read from file>)
val a=10
rdd.map(r => if (a==10) 1 else 0)
This throws -

java.lang.StackOverflowError
    at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
...
...

More experiments  .. this works -

val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)

But below doesn't and throws the StackoverflowError -

val lst = MutableList[(String,String,Double)]()
Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)

Any help appreciated!

Thanks,
Ashish



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark shell and StackOverFlowError

Posted by Ted Yu <yu...@gmail.com>.
I used the notation on JIRA where bq means quote.

FYI

On Mon, Aug 31, 2015 at 12:34 PM, Ashish Shrowty <as...@gmail.com>
wrote:

> Yes .. I am closing the stream.
>
> Not sure what you meant by "bq. and then create rdd"?
>
> -Ashish
>
> On Mon, Aug 31, 2015 at 1:02 PM Ted Yu <yu...@gmail.com> wrote:
>
>> I am not familiar with your code.
>>
>> bq. and then create the rdd
>>
>> I assume you call ObjectOutputStream.close() prior to the above step.
>>
>> Cheers
>>
>> On Mon, Aug 31, 2015 at 9:42 AM, Ashish Shrowty <ashish.shrowty@gmail.com
>> > wrote:
>>
>>> Sure .. here it is (scroll below to see the NotSerializableException).
>>> Note that upstream, I do load up the (user,item,ratings) data from a file
>>> using ObjectInputStream, do some calculations that I put in a map and then
>>> create the rdd used in the code above from that map. I even tried
>>> checkpointing the rdd and persisting it to break any lineage to the
>>> original ObjectInputStream (if that was what was happening) -
>>>
>>> org.apache.spark.SparkException: Task not serializable
>>>
>>> at
>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>
>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>
>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
>>>
>>> at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295)
>>>
>>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
>>>
>>> at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
>>>
>>> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
>>>
>>> at $iwC$$iwC$$iwC.<init>(<console>:50)
>>>
>>> at $iwC$$iwC.<init>(<console>:52)
>>>
>>> at $iwC.<init>(<console>:54)
>>>
>>> at <init>(<console>:56)
>>>
>>> at .<init>(<console>:60)
>>>
>>> at .<clinit>(<console>)
>>>
>>> at .<init>(<console>:7)
>>>
>>> at .<clinit>(<console>)
>>>
>>> at $print(<console>)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>> at
>>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
>>>
>>> at
>>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
>>>
>>> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
>>>
>>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
>>>
>>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
>>>
>>> at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:796)
>>>
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)
>>>
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)
>>>
>>> at
>>> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>>>
>>> at
>>> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>>>
>>> at
>>> scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
>>>
>>> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>>>
>>> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
>>>
>>> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
>>>
>>> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
>>>
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
>>>
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>>>
>>> at
>>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>>>
>>> at
>>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>>
>>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
>>>
>>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
>>>
>>> at org.apache.spark.repl.Main$.main(Main.scala:31)
>>>
>>> at org.apache.spark.repl.Main.main(Main.scala)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>>
>>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>>
>>> *Caused by: java.io.NotSerializableException: java.io.ObjectInputStream*
>>>
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>
>>> at
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>
>>> at
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>
>>> ...
>>>
>>> ...
>>>
>>> On Mon, Aug 31, 2015 at 12:23 PM Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Ashish:
>>>> Can you post the complete stack trace for NotSerializableException ?
>>>>
>>>> Cheers
>>>>
>>>> On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <
>>>> ashish.shrowty@gmail.com> wrote:
>>>>
>>>>> bcItemsIdx is just a broadcast variable constructed out of
>>>>> Array[(String)] .. it holds the item ids and I use it for indexing the
>>>>> MatrixEntry objects
>>>>>
>>>>>
>>>>> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:
>>>>>
>>>>>> It's not clear; that error is different still and somehow suggests
>>>>>> you're serializing a stream somewhere. I'd look at what's inside
>>>>>> bcItemsIdx as that is not shown here.
>>>>>>
>>>>>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
>>>>>>
>>>>>> <as...@gmail.com> wrote:
>>>>>> > Sean,
>>>>>> >
>>>>>> > Thanks for your comments. What I was really trying to do was to
>>>>>> transform a
>>>>>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
>>>>>> column
>>>>>> > similarity calculations while exploring the data before building
>>>>>> some
>>>>>> > models. But to do that I need to first convert the user and item
>>>>>> ids into
>>>>>> > respective indexes where I intended on passing in an array into the
>>>>>> closure,
>>>>>> > which is where I got stuck with this overflowerror trying to figure
>>>>>> out
>>>>>> > where it is happening. The actual error I got was slightly
>>>>>> different (Caused
>>>>>> > by: java.io.NotSerializableException: java.io.ObjectInputStream).
>>>>>> I started
>>>>>> > investigating this issue which led me to the earlier code snippet
>>>>>> that I had
>>>>>> > posted. This is again because of the bcItemsIdx variable being
>>>>>> passed into
>>>>>> > the closure. Below code works if I don't pass in the variable and
>>>>>> use simply
>>>>>> > a constant like 10 in its place .. The code thus far -
>>>>>> >
>>>>>> > // rdd below is RDD[(String,String,Double)]
>>>>>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array
>>>>>> of item
>>>>>> > ids
>>>>>> > val gRdd = rdd.map{case(user,item,rating) =>
>>>>>> > ((user),(item,rating))}.groupByKey
>>>>>> > val idxRdd = gRdd.zipWithIndex
>>>>>> > val cm = new CoordinateMatrix(
>>>>>> >     idxRdd.flatMap[MatrixEntry](e => {
>>>>>> >         e._1._2.map(item=> {
>>>>>> >                  MatrixEntry(e._2,
>>>>>> bcItemsIdx.value.indexOf(item._1),
>>>>>> > item._2) // <- This is where I get the Serialization error passing
>>>>>> in the
>>>>>> > index
>>>>>> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
>>>>>> >         })
>>>>>> >     })
>>>>>> > )
>>>>>> > val rm = cm.toRowMatrix
>>>>>> > val simMatrix = rm.columnSimilarities()
>>>>>> >
>>>>>> > I would like to make this work in the Spark shell as I am still
>>>>>> exploring
>>>>>> > the data. Let me know if there is an alternate way of constructing
>>>>>> the
>>>>>> > RowMatrix.
>>>>>> >
>>>>>> > Thanks and appreciate all the help!
>>>>>> >
>>>>>> > Ashish
>>>>>> >
>>>>>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Yeah I see that now. I think it fails immediately because the map
>>>>>> >> operation does try to clean and/or verify the serialization of the
>>>>>> >> closure upfront.
>>>>>> >>
>>>>>> >> I'm not quite sure what is going on, but I think it's some strange
>>>>>> >> interaction between how you're building up the list and what the
>>>>>> >> resulting representation happens to be like, and how the closure
>>>>>> >> cleaner works, which can't be perfect. The shell also introduces an
>>>>>> >> extra layer of issues.
>>>>>> >>
>>>>>> >> For example, the slightly more canonical approaches work fine:
>>>>>> >>
>>>>>> >> import scala.collection.mutable.MutableList
>>>>>> >> val lst = MutableList[(String,String,Double)]()
>>>>>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>>>>>> >>
>>>>>> >> or just
>>>>>> >>
>>>>>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>>>>>> >>
>>>>>> >> If you just need this to work, maybe those are better alternatives
>>>>>> anyway.
>>>>>> >> You can also check whether it works without the shell, as I suspect
>>>>>> >> that's a factor.
>>>>>> >>
>>>>>> >> It's not an error in Spark per se but saying that something's
>>>>>> default
>>>>>> >> Java serialization graph is very deep, so it's like the code you
>>>>>> wrote
>>>>>> >> plus the closure cleaner ends up pulling in some huge linked list
>>>>>> and
>>>>>> >> serializing it the direct and unuseful way.
>>>>>> >>
>>>>>> >> If you have an idea about exactly why it's happening you can open a
>>>>>> >> JIRA, but arguably it's something that's nice to just work but
>>>>>> isn't
>>>>>> >> to do with Spark per se. Or, have a look at others related to the
>>>>>> >> closure and shell and you may find this is related to other known
>>>>>> >> behavior.
>>>>>> >>
>>>>>> >>
>>>>>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>>>>>> >> <as...@gmail.com> wrote:
>>>>>> >> > Sean .. does the code below work for you in the Spark shell? Ted
>>>>>> got the
>>>>>> >> > same error -
>>>>>> >> >
>>>>>> >> > val a=10
>>>>>> >> > val lst = MutableList[(String,String,Double)]()
>>>>>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>> >> >
>>>>>> >> > -Ashish
>>>>>> >> >
>>>>>> >> >
>>>>>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com>
>>>>>> wrote:
>>>>>> >> >>
>>>>>> >> >> I'm not sure how to reproduce it? this code does not produce an
>>>>>> error
>>>>>> >> >> in
>>>>>> >> >> master.
>>>>>> >> >>
>>>>>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>>>>>> >> >> <as...@gmail.com> wrote:
>>>>>> >> >> > Do you think I should create a JIRA?
>>>>>> >> >> >
>>>>>> >> >> >
>>>>>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com>
>>>>>> wrote:
>>>>>> >> >> >>
>>>>>> >> >> >> I got StackOverFlowError as well :-(
>>>>>> >> >> >>
>>>>>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>>>>>> >> >> >> <as...@gmail.com>
>>>>>> >> >> >> wrote:
>>>>>> >> >> >>>
>>>>>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference.
>>>>>> Are you
>>>>>> >> >> >>> able
>>>>>> >> >> >>> to replicate on your side?
>>>>>> >> >> >>>
>>>>>> >> >> >>>
>>>>>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <
>>>>>> yuzhihong@gmail.com>
>>>>>> >> >> >>> wrote:
>>>>>> >> >> >>>>
>>>>>> >> >> >>>> I see.
>>>>>> >> >> >>>>
>>>>>> >> >> >>>> What about using the following in place of variable a ?
>>>>>> >> >> >>>>
>>>>>> >> >> >>>>
>>>>>> >> >> >>>>
>>>>>> >> >> >>>>
>>>>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>>>>> >> >> >>>>
>>>>>> >> >> >>>> Cheers
>>>>>> >> >> >>>>
>>>>>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>>>>>> >> >> >>>> <as...@gmail.com> wrote:
>>>>>> >> >> >>>>>
>>>>>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
>>>>>> >> >> >>>>> stackoverflowerror, its very weird
>>>>>> >> >> >>>>>
>>>>>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The
>>>>>> error
>>>>>> >> >> >>>>> happens
>>>>>> >> >> >>>>> when I try to pass a variable into the closure. The
>>>>>> example you
>>>>>> >> >> >>>>> have
>>>>>> >> >> >>>>> above
>>>>>> >> >> >>>>> works fine since there is no variable being passed into
>>>>>> the
>>>>>> >> >> >>>>> closure
>>>>>> >> >> >>>>> from the
>>>>>> >> >> >>>>> shell.
>>>>>> >> >> >>>>>
>>>>>> >> >> >>>>> -Ashish
>>>>>> >> >> >>>>>
>>>>>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <
>>>>>> yuzhihong@gmail.com>
>>>>>> >> >> >>>>> wrote:
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> Using Spark shell :
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
>>>>>> >> >> >>>>>> import scala.collection.mutable.MutableList
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>>>>>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String,
>>>>>> String,
>>>>>> >> >> >>>>>> Double)]
>>>>>> >> >> >>>>>> =
>>>>>> >> >> >>>>>> MutableList()
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> scala>
>>>>>> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else
>>>>>> 0)
>>>>>> >> >> >>>>>> <console>:27: error: not found: value a
>>>>>> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else
>>>>>> 0)
>>>>>> >> >> >>>>>>                                           ^
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1
>>>>>> else 0)
>>>>>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1]
>>>>>> at map
>>>>>> >> >> >>>>>> at
>>>>>> >> >> >>>>>> <console>:27
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> scala> rdd.count()
>>>>>> >> >> >>>>>> ...
>>>>>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished:
>>>>>> count at
>>>>>> >> >> >>>>>> <console>:30, took 0.478350 s
>>>>>> >> >> >>>>>> res1: Long = 10000
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> Ashish:
>>>>>> >> >> >>>>>> Please refine your example to mimic more closely what
>>>>>> your code
>>>>>> >> >> >>>>>> actually did.
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> Thanks
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
>>>>>> sowen@cloudera.com>
>>>>>> >> >> >>>>>> wrote:
>>>>>> >> >> >>>>>>>
>>>>>> >> >> >>>>>>> That can't cause any error, since there is no action in
>>>>>> your
>>>>>> >> >> >>>>>>> first
>>>>>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause
>>>>>> an
>>>>>> >> >> >>>>>>> error.
>>>>>> >> >> >>>>>>> You
>>>>>> >> >> >>>>>>> must be executing something different.
>>>>>> >> >> >>>>>>>
>>>>>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>>>>>> >> >> >>>>>>> <as...@gmail.com>
>>>>>> >> >> >>>>>>> wrote:
>>>>>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode
>>>>>> and I have
>>>>>> >> >> >>>>>>> > a
>>>>>> >> >> >>>>>>> > simple
>>>>>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects
>>>>>> in it.
>>>>>> >> >> >>>>>>> > I
>>>>>> >> >> >>>>>>> > get
>>>>>> >> >> >>>>>>> > a
>>>>>> >> >> >>>>>>> > StackOverFlowError each time I try to run the
>>>>>> following code
>>>>>> >> >> >>>>>>> > (the
>>>>>> >> >> >>>>>>> > code
>>>>>> >> >> >>>>>>> > itself is just representative of other logic where I
>>>>>> need to
>>>>>> >> >> >>>>>>> > pass
>>>>>> >> >> >>>>>>> > in a
>>>>>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but
>>>>>> no luck
>>>>>> >> >> >>>>>>> > ..
>>>>>> >> >> >>>>>>> > missing
>>>>>> >> >> >>>>>>> > something basic here -
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>>>> >> >> >>>>>>> > val a=10
>>>>>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>>>> >> >> >>>>>>> > This throws -
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > java.lang.StackOverflowError
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>> >> >> >>>>>>> >     at
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>> >> >> >>>>>>> > ...
>>>>>> >> >> >>>>>>> > ...
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > More experiments  .. this works -
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > val lst =
>>>>>> Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>>>>>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > Any help appreciated!
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > Thanks,
>>>>>> >> >> >>>>>>> > Ashish
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> > --
>>>>>> >> >> >>>>>>> > View this message in context:
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>>>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list
>>>>>> archive at
>>>>>> >> >> >>>>>>> > Nabble.com.
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>> >
>>>>>> ---------------------------------------------------------------------
>>>>>> >> >> >>>>>>> > To unsubscribe, e-mail:
>>>>>> user-unsubscribe@spark.apache.org
>>>>>> >> >> >>>>>>> > For additional commands, e-mail:
>>>>>> user-help@spark.apache.org
>>>>>> >> >> >>>>>>> >
>>>>>> >> >> >>>>>>>
>>>>>> >> >> >>>>>>>
>>>>>> >> >> >>>>>>>
>>>>>> >> >> >>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> >> >> >>>>>>> To unsubscribe, e-mail:
>>>>>> user-unsubscribe@spark.apache.org
>>>>>> >> >> >>>>>>> For additional commands, e-mail:
>>>>>> user-help@spark.apache.org
>>>>>> >> >> >>>>>>>
>>>>>> >> >> >>>>>>
>>>>>> >> >> >>>>
>>>>>> >> >> >>
>>>>>> >> >> >
>>>>>>
>>>>>
>>>>
>>

Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
Yes .. I am closing the stream.

Not sure what you meant by "bq. and then create rdd"?

-Ashish

On Mon, Aug 31, 2015 at 1:02 PM Ted Yu <yu...@gmail.com> wrote:

> I am not familiar with your code.
>
> bq. and then create the rdd
>
> I assume you call ObjectOutputStream.close() prior to the above step.
>
> Cheers
>
> On Mon, Aug 31, 2015 at 9:42 AM, Ashish Shrowty <as...@gmail.com>
> wrote:
>
>> Sure .. here it is (scroll below to see the NotSerializableException).
>> Note that upstream, I do load up the (user,item,ratings) data from a file
>> using ObjectInputStream, do some calculations that I put in a map and then
>> create the rdd used in the code above from that map. I even tried
>> checkpointing the rdd and persisting it to break any lineage to the
>> original ObjectInputStream (if that was what was happening) -
>>
>> org.apache.spark.SparkException: Task not serializable
>>
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
>>
>> at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295)
>>
>> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
>>
>> at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
>>
>> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
>>
>> at $iwC$$iwC$$iwC.<init>(<console>:50)
>>
>> at $iwC$$iwC.<init>(<console>:52)
>>
>> at $iwC.<init>(<console>:54)
>>
>> at <init>(<console>:56)
>>
>> at .<init>(<console>:60)
>>
>> at .<clinit>(<console>)
>>
>> at .<init>(<console>:7)
>>
>> at .<clinit>(<console>)
>>
>> at $print(<console>)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:606)
>>
>> at
>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
>>
>> at
>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
>>
>> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
>>
>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
>>
>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
>>
>> at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:796)
>>
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)
>>
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)
>>
>> at
>> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>>
>> at
>> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>>
>> at
>> scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
>>
>> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>>
>> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
>>
>> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
>>
>> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
>>
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
>>
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>>
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>>
>> at
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>
>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
>>
>> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
>>
>> at org.apache.spark.repl.Main$.main(Main.scala:31)
>>
>> at org.apache.spark.repl.Main.main(Main.scala)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:606)
>>
>> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> *Caused by: java.io.NotSerializableException: java.io.ObjectInputStream*
>>
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>
>> at
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>
>> at
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>
>> ...
>>
>> ...
>>
>> On Mon, Aug 31, 2015 at 12:23 PM Ted Yu <yu...@gmail.com> wrote:
>>
>>> Ashish:
>>> Can you post the complete stack trace for NotSerializableException ?
>>>
>>> Cheers
>>>
>>> On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <
>>> ashish.shrowty@gmail.com> wrote:
>>>
>>>> bcItemsIdx is just a broadcast variable constructed out of
>>>> Array[(String)] .. it holds the item ids and I use it for indexing the
>>>> MatrixEntry objects
>>>>
>>>>
>>>> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:
>>>>
>>>>> It's not clear; that error is different still and somehow suggests
>>>>> you're serializing a stream somewhere. I'd look at what's inside
>>>>> bcItemsIdx as that is not shown here.
>>>>>
>>>>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
>>>>>
>>>>> <as...@gmail.com> wrote:
>>>>> > Sean,
>>>>> >
>>>>> > Thanks for your comments. What I was really trying to do was to
>>>>> transform a
>>>>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
>>>>> column
>>>>> > similarity calculations while exploring the data before building some
>>>>> > models. But to do that I need to first convert the user and item ids
>>>>> into
>>>>> > respective indexes where I intended on passing in an array into the
>>>>> closure,
>>>>> > which is where I got stuck with this overflowerror trying to figure
>>>>> out
>>>>> > where it is happening. The actual error I got was slightly different
>>>>> (Caused
>>>>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
>>>>> started
>>>>> > investigating this issue which led me to the earlier code snippet
>>>>> that I had
>>>>> > posted. This is again because of the bcItemsIdx variable being
>>>>> passed into
>>>>> > the closure. Below code works if I don't pass in the variable and
>>>>> use simply
>>>>> > a constant like 10 in its place .. The code thus far -
>>>>> >
>>>>> > // rdd below is RDD[(String,String,Double)]
>>>>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of
>>>>> item
>>>>> > ids
>>>>> > val gRdd = rdd.map{case(user,item,rating) =>
>>>>> > ((user),(item,rating))}.groupByKey
>>>>> > val idxRdd = gRdd.zipWithIndex
>>>>> > val cm = new CoordinateMatrix(
>>>>> >     idxRdd.flatMap[MatrixEntry](e => {
>>>>> >         e._1._2.map(item=> {
>>>>> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
>>>>> > item._2) // <- This is where I get the Serialization error passing
>>>>> in the
>>>>> > index
>>>>> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
>>>>> >         })
>>>>> >     })
>>>>> > )
>>>>> > val rm = cm.toRowMatrix
>>>>> > val simMatrix = rm.columnSimilarities()
>>>>> >
>>>>> > I would like to make this work in the Spark shell as I am still
>>>>> exploring
>>>>> > the data. Let me know if there is an alternate way of constructing
>>>>> the
>>>>> > RowMatrix.
>>>>> >
>>>>> > Thanks and appreciate all the help!
>>>>> >
>>>>> > Ashish
>>>>> >
>>>>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com>
>>>>> wrote:
>>>>> >>
>>>>> >> Yeah I see that now. I think it fails immediately because the map
>>>>> >> operation does try to clean and/or verify the serialization of the
>>>>> >> closure upfront.
>>>>> >>
>>>>> >> I'm not quite sure what is going on, but I think it's some strange
>>>>> >> interaction between how you're building up the list and what the
>>>>> >> resulting representation happens to be like, and how the closure
>>>>> >> cleaner works, which can't be perfect. The shell also introduces an
>>>>> >> extra layer of issues.
>>>>> >>
>>>>> >> For example, the slightly more canonical approaches work fine:
>>>>> >>
>>>>> >> import scala.collection.mutable.MutableList
>>>>> >> val lst = MutableList[(String,String,Double)]()
>>>>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>>>>> >>
>>>>> >> or just
>>>>> >>
>>>>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>>>>> >>
>>>>> >> If you just need this to work, maybe those are better alternatives
>>>>> anyway.
>>>>> >> You can also check whether it works without the shell, as I suspect
>>>>> >> that's a factor.
>>>>> >>
>>>>> >> It's not an error in Spark per se but saying that something's
>>>>> default
>>>>> >> Java serialization graph is very deep, so it's like the code you
>>>>> wrote
>>>>> >> plus the closure cleaner ends up pulling in some huge linked list
>>>>> and
>>>>> >> serializing it the direct and unuseful way.
>>>>> >>
>>>>> >> If you have an idea about exactly why it's happening you can open a
>>>>> >> JIRA, but arguably it's something that's nice to just work but isn't
>>>>> >> to do with Spark per se. Or, have a look at others related to the
>>>>> >> closure and shell and you may find this is related to other known
>>>>> >> behavior.
>>>>> >>
>>>>> >>
>>>>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>>>>> >> <as...@gmail.com> wrote:
>>>>> >> > Sean .. does the code below work for you in the Spark shell? Ted
>>>>> got the
>>>>> >> > same error -
>>>>> >> >
>>>>> >> > val a=10
>>>>> >> > val lst = MutableList[(String,String,Double)]()
>>>>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >> >
>>>>> >> > -Ashish
>>>>> >> >
>>>>> >> >
>>>>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com>
>>>>> wrote:
>>>>> >> >>
>>>>> >> >> I'm not sure how to reproduce it? this code does not produce an
>>>>> error
>>>>> >> >> in
>>>>> >> >> master.
>>>>> >> >>
>>>>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>>>>> >> >> <as...@gmail.com> wrote:
>>>>> >> >> > Do you think I should create a JIRA?
>>>>> >> >> >
>>>>> >> >> >
>>>>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com>
>>>>> wrote:
>>>>> >> >> >>
>>>>> >> >> >> I got StackOverFlowError as well :-(
>>>>> >> >> >>
>>>>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>>>>> >> >> >> <as...@gmail.com>
>>>>> >> >> >> wrote:
>>>>> >> >> >>>
>>>>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference.
>>>>> Are you
>>>>> >> >> >>> able
>>>>> >> >> >>> to replicate on your side?
>>>>> >> >> >>>
>>>>> >> >> >>>
>>>>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhihong@gmail.com
>>>>> >
>>>>> >> >> >>> wrote:
>>>>> >> >> >>>>
>>>>> >> >> >>>> I see.
>>>>> >> >> >>>>
>>>>> >> >> >>>> What about using the following in place of variable a ?
>>>>> >> >> >>>>
>>>>> >> >> >>>>
>>>>> >> >> >>>>
>>>>> >> >> >>>>
>>>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>>>> >> >> >>>>
>>>>> >> >> >>>> Cheers
>>>>> >> >> >>>>
>>>>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>>>>> >> >> >>>> <as...@gmail.com> wrote:
>>>>> >> >> >>>>>
>>>>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
>>>>> >> >> >>>>> stackoverflowerror, its very weird
>>>>> >> >> >>>>>
>>>>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
>>>>> >> >> >>>>> happens
>>>>> >> >> >>>>> when I try to pass a variable into the closure. The
>>>>> example you
>>>>> >> >> >>>>> have
>>>>> >> >> >>>>> above
>>>>> >> >> >>>>> works fine since there is no variable being passed into the
>>>>> >> >> >>>>> closure
>>>>> >> >> >>>>> from the
>>>>> >> >> >>>>> shell.
>>>>> >> >> >>>>>
>>>>> >> >> >>>>> -Ashish
>>>>> >> >> >>>>>
>>>>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <
>>>>> yuzhihong@gmail.com>
>>>>> >> >> >>>>> wrote:
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> Using Spark shell :
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
>>>>> >> >> >>>>>> import scala.collection.mutable.MutableList
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>>>>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
>>>>> >> >> >>>>>> Double)]
>>>>> >> >> >>>>>> =
>>>>> >> >> >>>>>> MutableList()
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> scala>
>>>>> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >> >> >>>>>> <console>:27: error: not found: value a
>>>>> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >> >> >>>>>>                                           ^
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1
>>>>> else 0)
>>>>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1]
>>>>> at map
>>>>> >> >> >>>>>> at
>>>>> >> >> >>>>>> <console>:27
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> scala> rdd.count()
>>>>> >> >> >>>>>> ...
>>>>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished:
>>>>> count at
>>>>> >> >> >>>>>> <console>:30, took 0.478350 s
>>>>> >> >> >>>>>> res1: Long = 10000
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> Ashish:
>>>>> >> >> >>>>>> Please refine your example to mimic more closely what
>>>>> your code
>>>>> >> >> >>>>>> actually did.
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> Thanks
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
>>>>> sowen@cloudera.com>
>>>>> >> >> >>>>>> wrote:
>>>>> >> >> >>>>>>>
>>>>> >> >> >>>>>>> That can't cause any error, since there is no action in
>>>>> your
>>>>> >> >> >>>>>>> first
>>>>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause
>>>>> an
>>>>> >> >> >>>>>>> error.
>>>>> >> >> >>>>>>> You
>>>>> >> >> >>>>>>> must be executing something different.
>>>>> >> >> >>>>>>>
>>>>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>>>>> >> >> >>>>>>> <as...@gmail.com>
>>>>> >> >> >>>>>>> wrote:
>>>>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and
>>>>> I have
>>>>> >> >> >>>>>>> > a
>>>>> >> >> >>>>>>> > simple
>>>>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects
>>>>> in it.
>>>>> >> >> >>>>>>> > I
>>>>> >> >> >>>>>>> > get
>>>>> >> >> >>>>>>> > a
>>>>> >> >> >>>>>>> > StackOverFlowError each time I try to run the
>>>>> following code
>>>>> >> >> >>>>>>> > (the
>>>>> >> >> >>>>>>> > code
>>>>> >> >> >>>>>>> > itself is just representative of other logic where I
>>>>> need to
>>>>> >> >> >>>>>>> > pass
>>>>> >> >> >>>>>>> > in a
>>>>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but
>>>>> no luck
>>>>> >> >> >>>>>>> > ..
>>>>> >> >> >>>>>>> > missing
>>>>> >> >> >>>>>>> > something basic here -
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>>> >> >> >>>>>>> > val a=10
>>>>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>>> >> >> >>>>>>> > This throws -
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > java.lang.StackOverflowError
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>> >> >> >>>>>>> >     at
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>> >> >> >>>>>>> > ...
>>>>> >> >> >>>>>>> > ...
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > More experiments  .. this works -
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > val lst =
>>>>> Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>>>>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > Any help appreciated!
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > Thanks,
>>>>> >> >> >>>>>>> > Ashish
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> > --
>>>>> >> >> >>>>>>> > View this message in context:
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list
>>>>> archive at
>>>>> >> >> >>>>>>> > Nabble.com.
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>> >
>>>>> ---------------------------------------------------------------------
>>>>> >> >> >>>>>>> > To unsubscribe, e-mail:
>>>>> user-unsubscribe@spark.apache.org
>>>>> >> >> >>>>>>> > For additional commands, e-mail:
>>>>> user-help@spark.apache.org
>>>>> >> >> >>>>>>> >
>>>>> >> >> >>>>>>>
>>>>> >> >> >>>>>>>
>>>>> >> >> >>>>>>>
>>>>> >> >> >>>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> >> >> >>>>>>> To unsubscribe, e-mail:
>>>>> user-unsubscribe@spark.apache.org
>>>>> >> >> >>>>>>> For additional commands, e-mail:
>>>>> user-help@spark.apache.org
>>>>> >> >> >>>>>>>
>>>>> >> >> >>>>>>
>>>>> >> >> >>>>
>>>>> >> >> >>
>>>>> >> >> >
>>>>>
>>>>
>>>
>

Re: Spark shell and StackOverFlowError

Posted by Ted Yu <yu...@gmail.com>.
I am not familiar with your code.

bq. and then create the rdd

I assume you call ObjectOutputStream.close() prior to the above step.

Cheers

On Mon, Aug 31, 2015 at 9:42 AM, Ashish Shrowty <as...@gmail.com>
wrote:

> Sure .. here it is (scroll below to see the NotSerializableException).
> Note that upstream, I do load up the (user,item,ratings) data from a file
> using ObjectInputStream, do some calculations that I put in a map and then
> create the rdd used in the code above from that map. I even tried
> checkpointing the rdd and persisting it to break any lineage to the
> original ObjectInputStream (if that was what was happening) -
>
> org.apache.spark.SparkException: Task not serializable
>
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>
> at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)
>
> at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
>
> at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
>
> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
>
> at $iwC$$iwC$$iwC.<init>(<console>:50)
>
> at $iwC$$iwC.<init>(<console>:52)
>
> at $iwC.<init>(<console>:54)
>
> at <init>(<console>:56)
>
> at .<init>(<console>:60)
>
> at .<clinit>(<console>)
>
> at .<init>(<console>:7)
>
> at .<clinit>(<console>)
>
> at $print(<console>)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
>
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
>
> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
>
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
>
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
>
> at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:796)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)
>
> at
> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>
> at
> scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)
>
> at
> scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)
>
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
>
> at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
>
> at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
>
> at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
>
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
>
> at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
>
> at org.apache.spark.repl.Main$.main(Main.scala:31)
>
> at org.apache.spark.repl.Main.main(Main.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> *Caused by: java.io.NotSerializableException: java.io.ObjectInputStream*
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
> ...
>
> ...
>
> On Mon, Aug 31, 2015 at 12:23 PM Ted Yu <yu...@gmail.com> wrote:
>
>> Ashish:
>> Can you post the complete stack trace for NotSerializableException ?
>>
>> Cheers
>>
>> On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <ashish.shrowty@gmail.com
>> > wrote:
>>
>>> bcItemsIdx is just a broadcast variable constructed out of
>>> Array[(String)] .. it holds the item ids and I use it for indexing the
>>> MatrixEntry objects
>>>
>>>
>>> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> It's not clear; that error is different still and somehow suggests
>>>> you're serializing a stream somewhere. I'd look at what's inside
>>>> bcItemsIdx as that is not shown here.
>>>>
>>>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
>>>>
>>>> <as...@gmail.com> wrote:
>>>> > Sean,
>>>> >
>>>> > Thanks for your comments. What I was really trying to do was to
>>>> transform a
>>>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
>>>> column
>>>> > similarity calculations while exploring the data before building some
>>>> > models. But to do that I need to first convert the user and item ids
>>>> into
>>>> > respective indexes where I intended on passing in an array into the
>>>> closure,
>>>> > which is where I got stuck with this overflowerror trying to figure
>>>> out
>>>> > where it is happening. The actual error I got was slightly different
>>>> (Caused
>>>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
>>>> started
>>>> > investigating this issue which led me to the earlier code snippet
>>>> that I had
>>>> > posted. This is again because of the bcItemsIdx variable being passed
>>>> into
>>>> > the closure. Below code works if I don't pass in the variable and use
>>>> simply
>>>> > a constant like 10 in its place .. The code thus far -
>>>> >
>>>> > // rdd below is RDD[(String,String,Double)]
>>>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of
>>>> item
>>>> > ids
>>>> > val gRdd = rdd.map{case(user,item,rating) =>
>>>> > ((user),(item,rating))}.groupByKey
>>>> > val idxRdd = gRdd.zipWithIndex
>>>> > val cm = new CoordinateMatrix(
>>>> >     idxRdd.flatMap[MatrixEntry](e => {
>>>> >         e._1._2.map(item=> {
>>>> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
>>>> > item._2) // <- This is where I get the Serialization error passing in
>>>> the
>>>> > index
>>>> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
>>>> >         })
>>>> >     })
>>>> > )
>>>> > val rm = cm.toRowMatrix
>>>> > val simMatrix = rm.columnSimilarities()
>>>> >
>>>> > I would like to make this work in the Spark shell as I am still
>>>> exploring
>>>> > the data. Let me know if there is an alternate way of constructing the
>>>> > RowMatrix.
>>>> >
>>>> > Thanks and appreciate all the help!
>>>> >
>>>> > Ashish
>>>> >
>>>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
>>>> >>
>>>> >> Yeah I see that now. I think it fails immediately because the map
>>>> >> operation does try to clean and/or verify the serialization of the
>>>> >> closure upfront.
>>>> >>
>>>> >> I'm not quite sure what is going on, but I think it's some strange
>>>> >> interaction between how you're building up the list and what the
>>>> >> resulting representation happens to be like, and how the closure
>>>> >> cleaner works, which can't be perfect. The shell also introduces an
>>>> >> extra layer of issues.
>>>> >>
>>>> >> For example, the slightly more canonical approaches work fine:
>>>> >>
>>>> >> import scala.collection.mutable.MutableList
>>>> >> val lst = MutableList[(String,String,Double)]()
>>>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>>>> >>
>>>> >> or just
>>>> >>
>>>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>>>> >>
>>>> >> If you just need this to work, maybe those are better alternatives
>>>> anyway.
>>>> >> You can also check whether it works without the shell, as I suspect
>>>> >> that's a factor.
>>>> >>
>>>> >> It's not an error in Spark per se but saying that something's default
>>>> >> Java serialization graph is very deep, so it's like the code you
>>>> wrote
>>>> >> plus the closure cleaner ends up pulling in some huge linked list and
>>>> >> serializing it the direct and unuseful way.
>>>> >>
>>>> >> If you have an idea about exactly why it's happening you can open a
>>>> >> JIRA, but arguably it's something that's nice to just work but isn't
>>>> >> to do with Spark per se. Or, have a look at others related to the
>>>> >> closure and shell and you may find this is related to other known
>>>> >> behavior.
>>>> >>
>>>> >>
>>>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>>>> >> <as...@gmail.com> wrote:
>>>> >> > Sean .. does the code below work for you in the Spark shell? Ted
>>>> got the
>>>> >> > same error -
>>>> >> >
>>>> >> > val a=10
>>>> >> > val lst = MutableList[(String,String,Double)]()
>>>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >> >
>>>> >> > -Ashish
>>>> >> >
>>>> >> >
>>>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com>
>>>> wrote:
>>>> >> >>
>>>> >> >> I'm not sure how to reproduce it? this code does not produce an
>>>> error
>>>> >> >> in
>>>> >> >> master.
>>>> >> >>
>>>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>>>> >> >> <as...@gmail.com> wrote:
>>>> >> >> > Do you think I should create a JIRA?
>>>> >> >> >
>>>> >> >> >
>>>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com>
>>>> wrote:
>>>> >> >> >>
>>>> >> >> >> I got StackOverFlowError as well :-(
>>>> >> >> >>
>>>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>>>> >> >> >> <as...@gmail.com>
>>>> >> >> >> wrote:
>>>> >> >> >>>
>>>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference.
>>>> Are you
>>>> >> >> >>> able
>>>> >> >> >>> to replicate on your side?
>>>> >> >> >>>
>>>> >> >> >>>
>>>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com>
>>>> >> >> >>> wrote:
>>>> >> >> >>>>
>>>> >> >> >>>> I see.
>>>> >> >> >>>>
>>>> >> >> >>>> What about using the following in place of variable a ?
>>>> >> >> >>>>
>>>> >> >> >>>>
>>>> >> >> >>>>
>>>> >> >> >>>>
>>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>>> >> >> >>>>
>>>> >> >> >>>> Cheers
>>>> >> >> >>>>
>>>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>>>> >> >> >>>> <as...@gmail.com> wrote:
>>>> >> >> >>>>>
>>>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
>>>> >> >> >>>>> stackoverflowerror, its very weird
>>>> >> >> >>>>>
>>>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
>>>> >> >> >>>>> happens
>>>> >> >> >>>>> when I try to pass a variable into the closure. The example
>>>> you
>>>> >> >> >>>>> have
>>>> >> >> >>>>> above
>>>> >> >> >>>>> works fine since there is no variable being passed into the
>>>> >> >> >>>>> closure
>>>> >> >> >>>>> from the
>>>> >> >> >>>>> shell.
>>>> >> >> >>>>>
>>>> >> >> >>>>> -Ashish
>>>> >> >> >>>>>
>>>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhihong@gmail.com
>>>> >
>>>> >> >> >>>>> wrote:
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> Using Spark shell :
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
>>>> >> >> >>>>>> import scala.collection.mutable.MutableList
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>>>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
>>>> >> >> >>>>>> Double)]
>>>> >> >> >>>>>> =
>>>> >> >> >>>>>> MutableList()
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> scala>
>>>> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >> >> >>>>>> <console>:27: error: not found: value a
>>>> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >> >> >>>>>>                                           ^
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else
>>>> 0)
>>>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1]
>>>> at map
>>>> >> >> >>>>>> at
>>>> >> >> >>>>>> <console>:27
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> scala> rdd.count()
>>>> >> >> >>>>>> ...
>>>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count
>>>> at
>>>> >> >> >>>>>> <console>:30, took 0.478350 s
>>>> >> >> >>>>>> res1: Long = 10000
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> Ashish:
>>>> >> >> >>>>>> Please refine your example to mimic more closely what your
>>>> code
>>>> >> >> >>>>>> actually did.
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> Thanks
>>>> >> >> >>>>>>
>>>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
>>>> sowen@cloudera.com>
>>>> >> >> >>>>>> wrote:
>>>> >> >> >>>>>>>
>>>> >> >> >>>>>>> That can't cause any error, since there is no action in
>>>> your
>>>> >> >> >>>>>>> first
>>>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
>>>> >> >> >>>>>>> error.
>>>> >> >> >>>>>>> You
>>>> >> >> >>>>>>> must be executing something different.
>>>> >> >> >>>>>>>
>>>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>>>> >> >> >>>>>>> <as...@gmail.com>
>>>> >> >> >>>>>>> wrote:
>>>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and
>>>> I have
>>>> >> >> >>>>>>> > a
>>>> >> >> >>>>>>> > simple
>>>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects
>>>> in it.
>>>> >> >> >>>>>>> > I
>>>> >> >> >>>>>>> > get
>>>> >> >> >>>>>>> > a
>>>> >> >> >>>>>>> > StackOverFlowError each time I try to run the following
>>>> code
>>>> >> >> >>>>>>> > (the
>>>> >> >> >>>>>>> > code
>>>> >> >> >>>>>>> > itself is just representative of other logic where I
>>>> need to
>>>> >> >> >>>>>>> > pass
>>>> >> >> >>>>>>> > in a
>>>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but
>>>> no luck
>>>> >> >> >>>>>>> > ..
>>>> >> >> >>>>>>> > missing
>>>> >> >> >>>>>>> > something basic here -
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>> >> >> >>>>>>> > val a=10
>>>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>> >> >> >>>>>>> > This throws -
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > java.lang.StackOverflowError
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>> >> >> >>>>>>> >     at
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>> >> >> >>>>>>> > ...
>>>> >> >> >>>>>>> > ...
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > More experiments  .. this works -
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > val lst =
>>>> Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>>>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > Any help appreciated!
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > Thanks,
>>>> >> >> >>>>>>> > Ashish
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> > --
>>>> >> >> >>>>>>> > View this message in context:
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list
>>>> archive at
>>>> >> >> >>>>>>> > Nabble.com.
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>> >
>>>> ---------------------------------------------------------------------
>>>> >> >> >>>>>>> > To unsubscribe, e-mail:
>>>> user-unsubscribe@spark.apache.org
>>>> >> >> >>>>>>> > For additional commands, e-mail:
>>>> user-help@spark.apache.org
>>>> >> >> >>>>>>> >
>>>> >> >> >>>>>>>
>>>> >> >> >>>>>>>
>>>> >> >> >>>>>>>
>>>> >> >> >>>>>>>
>>>> ---------------------------------------------------------------------
>>>> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> >> >> >>>>>>> For additional commands, e-mail:
>>>> user-help@spark.apache.org
>>>> >> >> >>>>>>>
>>>> >> >> >>>>>>
>>>> >> >> >>>>
>>>> >> >> >>
>>>> >> >> >
>>>>
>>>
>>

Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
Sure .. here it is (scroll below to see the NotSerializableException). Note
that upstream, I do load up the (user,item,ratings) data from a file using
ObjectInputStream, do some calculations that I put in a map and then create
the rdd used in the code above from that map. I even tried checkpointing
the rdd and persisting it to break any lineage to the original
ObjectInputStream (if that was what was happening) -

org.apache.spark.SparkException: Task not serializable

at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)

at org.apache.spark.SparkContext.clean(SparkContext.scala:1478)

at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)

at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)

at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48)

at $iwC$$iwC$$iwC.<init>(<console>:50)

at $iwC$$iwC.<init>(<console>:52)

at $iwC.<init>(<console>:54)

at <init>(<console>:56)

at .<init>(<console>:60)

at .<clinit>(<console>)

at .<init>(<console>:7)

at .<clinit>(<console>)

at $print(<console>)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)

at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)

at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)

at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:796)

at
org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)

at
org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321)

at
scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)

at
scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65)

at
scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)

at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

*Caused by: java.io.NotSerializableException: java.io.ObjectInputStream*

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)

at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)

at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)

...

...

On Mon, Aug 31, 2015 at 12:23 PM Ted Yu <yu...@gmail.com> wrote:

> Ashish:
> Can you post the complete stack trace for NotSerializableException ?
>
> Cheers
>
> On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <as...@gmail.com>
> wrote:
>
>> bcItemsIdx is just a broadcast variable constructed out of
>> Array[(String)] .. it holds the item ids and I use it for indexing the
>> MatrixEntry objects
>>
>>
>> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:
>>
>>> It's not clear; that error is different still and somehow suggests
>>> you're serializing a stream somewhere. I'd look at what's inside
>>> bcItemsIdx as that is not shown here.
>>>
>>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
>>>
>>> <as...@gmail.com> wrote:
>>> > Sean,
>>> >
>>> > Thanks for your comments. What I was really trying to do was to
>>> transform a
>>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
>>> column
>>> > similarity calculations while exploring the data before building some
>>> > models. But to do that I need to first convert the user and item ids
>>> into
>>> > respective indexes where I intended on passing in an array into the
>>> closure,
>>> > which is where I got stuck with this overflowerror trying to figure out
>>> > where it is happening. The actual error I got was slightly different
>>> (Caused
>>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
>>> started
>>> > investigating this issue which led me to the earlier code snippet that
>>> I had
>>> > posted. This is again because of the bcItemsIdx variable being passed
>>> into
>>> > the closure. Below code works if I don't pass in the variable and use
>>> simply
>>> > a constant like 10 in its place .. The code thus far -
>>> >
>>> > // rdd below is RDD[(String,String,Double)]
>>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of
>>> item
>>> > ids
>>> > val gRdd = rdd.map{case(user,item,rating) =>
>>> > ((user),(item,rating))}.groupByKey
>>> > val idxRdd = gRdd.zipWithIndex
>>> > val cm = new CoordinateMatrix(
>>> >     idxRdd.flatMap[MatrixEntry](e => {
>>> >         e._1._2.map(item=> {
>>> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
>>> > item._2) // <- This is where I get the Serialization error passing in
>>> the
>>> > index
>>> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
>>> >         })
>>> >     })
>>> > )
>>> > val rm = cm.toRowMatrix
>>> > val simMatrix = rm.columnSimilarities()
>>> >
>>> > I would like to make this work in the Spark shell as I am still
>>> exploring
>>> > the data. Let me know if there is an alternate way of constructing the
>>> > RowMatrix.
>>> >
>>> > Thanks and appreciate all the help!
>>> >
>>> > Ashish
>>> >
>>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
>>> >>
>>> >> Yeah I see that now. I think it fails immediately because the map
>>> >> operation does try to clean and/or verify the serialization of the
>>> >> closure upfront.
>>> >>
>>> >> I'm not quite sure what is going on, but I think it's some strange
>>> >> interaction between how you're building up the list and what the
>>> >> resulting representation happens to be like, and how the closure
>>> >> cleaner works, which can't be perfect. The shell also introduces an
>>> >> extra layer of issues.
>>> >>
>>> >> For example, the slightly more canonical approaches work fine:
>>> >>
>>> >> import scala.collection.mutable.MutableList
>>> >> val lst = MutableList[(String,String,Double)]()
>>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>>> >>
>>> >> or just
>>> >>
>>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>>> >>
>>> >> If you just need this to work, maybe those are better alternatives
>>> anyway.
>>> >> You can also check whether it works without the shell, as I suspect
>>> >> that's a factor.
>>> >>
>>> >> It's not an error in Spark per se but saying that something's default
>>> >> Java serialization graph is very deep, so it's like the code you wrote
>>> >> plus the closure cleaner ends up pulling in some huge linked list and
>>> >> serializing it the direct and unuseful way.
>>> >>
>>> >> If you have an idea about exactly why it's happening you can open a
>>> >> JIRA, but arguably it's something that's nice to just work but isn't
>>> >> to do with Spark per se. Or, have a look at others related to the
>>> >> closure and shell and you may find this is related to other known
>>> >> behavior.
>>> >>
>>> >>
>>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>>> >> <as...@gmail.com> wrote:
>>> >> > Sean .. does the code below work for you in the Spark shell? Ted
>>> got the
>>> >> > same error -
>>> >> >
>>> >> > val a=10
>>> >> > val lst = MutableList[(String,String,Double)]()
>>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >> >
>>> >> > -Ashish
>>> >> >
>>> >> >
>>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com>
>>> wrote:
>>> >> >>
>>> >> >> I'm not sure how to reproduce it? this code does not produce an
>>> error
>>> >> >> in
>>> >> >> master.
>>> >> >>
>>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>>> >> >> <as...@gmail.com> wrote:
>>> >> >> > Do you think I should create a JIRA?
>>> >> >> >
>>> >> >> >
>>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com>
>>> wrote:
>>> >> >> >>
>>> >> >> >> I got StackOverFlowError as well :-(
>>> >> >> >>
>>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>>> >> >> >> <as...@gmail.com>
>>> >> >> >> wrote:
>>> >> >> >>>
>>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference.
>>> Are you
>>> >> >> >>> able
>>> >> >> >>> to replicate on your side?
>>> >> >> >>>
>>> >> >> >>>
>>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com>
>>> >> >> >>> wrote:
>>> >> >> >>>>
>>> >> >> >>>> I see.
>>> >> >> >>>>
>>> >> >> >>>> What about using the following in place of variable a ?
>>> >> >> >>>>
>>> >> >> >>>>
>>> >> >> >>>>
>>> >> >> >>>>
>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>> >> >> >>>>
>>> >> >> >>>> Cheers
>>> >> >> >>>>
>>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>>> >> >> >>>> <as...@gmail.com> wrote:
>>> >> >> >>>>>
>>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
>>> >> >> >>>>> stackoverflowerror, its very weird
>>> >> >> >>>>>
>>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
>>> >> >> >>>>> happens
>>> >> >> >>>>> when I try to pass a variable into the closure. The example
>>> you
>>> >> >> >>>>> have
>>> >> >> >>>>> above
>>> >> >> >>>>> works fine since there is no variable being passed into the
>>> >> >> >>>>> closure
>>> >> >> >>>>> from the
>>> >> >> >>>>> shell.
>>> >> >> >>>>>
>>> >> >> >>>>> -Ashish
>>> >> >> >>>>>
>>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com>
>>> >> >> >>>>> wrote:
>>> >> >> >>>>>>
>>> >> >> >>>>>> Using Spark shell :
>>> >> >> >>>>>>
>>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
>>> >> >> >>>>>> import scala.collection.mutable.MutableList
>>> >> >> >>>>>>
>>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
>>> >> >> >>>>>> Double)]
>>> >> >> >>>>>> =
>>> >> >> >>>>>> MutableList()
>>> >> >> >>>>>>
>>> >> >> >>>>>> scala>
>>> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>> >> >> >>>>>>
>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >> >> >>>>>> <console>:27: error: not found: value a
>>> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >> >> >>>>>>                                           ^
>>> >> >> >>>>>>
>>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else
>>> 0)
>>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
>>> map
>>> >> >> >>>>>> at
>>> >> >> >>>>>> <console>:27
>>> >> >> >>>>>>
>>> >> >> >>>>>> scala> rdd.count()
>>> >> >> >>>>>> ...
>>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count
>>> at
>>> >> >> >>>>>> <console>:30, took 0.478350 s
>>> >> >> >>>>>> res1: Long = 10000
>>> >> >> >>>>>>
>>> >> >> >>>>>> Ashish:
>>> >> >> >>>>>> Please refine your example to mimic more closely what your
>>> code
>>> >> >> >>>>>> actually did.
>>> >> >> >>>>>>
>>> >> >> >>>>>> Thanks
>>> >> >> >>>>>>
>>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
>>> sowen@cloudera.com>
>>> >> >> >>>>>> wrote:
>>> >> >> >>>>>>>
>>> >> >> >>>>>>> That can't cause any error, since there is no action in
>>> your
>>> >> >> >>>>>>> first
>>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
>>> >> >> >>>>>>> error.
>>> >> >> >>>>>>> You
>>> >> >> >>>>>>> must be executing something different.
>>> >> >> >>>>>>>
>>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>>> >> >> >>>>>>> <as...@gmail.com>
>>> >> >> >>>>>>> wrote:
>>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I
>>> have
>>> >> >> >>>>>>> > a
>>> >> >> >>>>>>> > simple
>>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in
>>> it.
>>> >> >> >>>>>>> > I
>>> >> >> >>>>>>> > get
>>> >> >> >>>>>>> > a
>>> >> >> >>>>>>> > StackOverFlowError each time I try to run the following
>>> code
>>> >> >> >>>>>>> > (the
>>> >> >> >>>>>>> > code
>>> >> >> >>>>>>> > itself is just representative of other logic where I
>>> need to
>>> >> >> >>>>>>> > pass
>>> >> >> >>>>>>> > in a
>>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no
>>> luck
>>> >> >> >>>>>>> > ..
>>> >> >> >>>>>>> > missing
>>> >> >> >>>>>>> > something basic here -
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>> >> >> >>>>>>> > val a=10
>>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>> >> >> >>>>>>> > This throws -
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > java.lang.StackOverflowError
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>> >> >> >>>>>>> >     at
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>> >> >> >>>>>>> > ...
>>> >> >> >>>>>>> > ...
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > More experiments  .. this works -
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > val lst =
>>> Range(0,10000).map(i=>("10","10",i:Double)).toList
>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > Any help appreciated!
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > Thanks,
>>> >> >> >>>>>>> > Ashish
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> > --
>>> >> >> >>>>>>> > View this message in context:
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list
>>> archive at
>>> >> >> >>>>>>> > Nabble.com.
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>> >
>>> ---------------------------------------------------------------------
>>> >> >> >>>>>>> > To unsubscribe, e-mail:
>>> user-unsubscribe@spark.apache.org
>>> >> >> >>>>>>> > For additional commands, e-mail:
>>> user-help@spark.apache.org
>>> >> >> >>>>>>> >
>>> >> >> >>>>>>>
>>> >> >> >>>>>>>
>>> >> >> >>>>>>>
>>> >> >> >>>>>>>
>>> ---------------------------------------------------------------------
>>> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> >> >> >>>>>>> For additional commands, e-mail:
>>> user-help@spark.apache.org
>>> >> >> >>>>>>>
>>> >> >> >>>>>>
>>> >> >> >>>>
>>> >> >> >>
>>> >> >> >
>>>
>>
>

Re: Spark shell and StackOverFlowError

Posted by Ted Yu <yu...@gmail.com>.
Ashish:
Can you post the complete stack trace for NotSerializableException ?

Cheers

On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <as...@gmail.com>
wrote:

> bcItemsIdx is just a broadcast variable constructed out of Array[(String)]
> .. it holds the item ids and I use it for indexing the MatrixEntry objects
>
>
> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:
>
>> It's not clear; that error is different still and somehow suggests
>> you're serializing a stream somewhere. I'd look at what's inside
>> bcItemsIdx as that is not shown here.
>>
>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
>>
>> <as...@gmail.com> wrote:
>> > Sean,
>> >
>> > Thanks for your comments. What I was really trying to do was to
>> transform a
>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
>> column
>> > similarity calculations while exploring the data before building some
>> > models. But to do that I need to first convert the user and item ids
>> into
>> > respective indexes where I intended on passing in an array into the
>> closure,
>> > which is where I got stuck with this overflowerror trying to figure out
>> > where it is happening. The actual error I got was slightly different
>> (Caused
>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
>> started
>> > investigating this issue which led me to the earlier code snippet that
>> I had
>> > posted. This is again because of the bcItemsIdx variable being passed
>> into
>> > the closure. Below code works if I don't pass in the variable and use
>> simply
>> > a constant like 10 in its place .. The code thus far -
>> >
>> > // rdd below is RDD[(String,String,Double)]
>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of
>> item
>> > ids
>> > val gRdd = rdd.map{case(user,item,rating) =>
>> > ((user),(item,rating))}.groupByKey
>> > val idxRdd = gRdd.zipWithIndex
>> > val cm = new CoordinateMatrix(
>> >     idxRdd.flatMap[MatrixEntry](e => {
>> >         e._1._2.map(item=> {
>> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
>> > item._2) // <- This is where I get the Serialization error passing in
>> the
>> > index
>> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
>> >         })
>> >     })
>> > )
>> > val rm = cm.toRowMatrix
>> > val simMatrix = rm.columnSimilarities()
>> >
>> > I would like to make this work in the Spark shell as I am still
>> exploring
>> > the data. Let me know if there is an alternate way of constructing the
>> > RowMatrix.
>> >
>> > Thanks and appreciate all the help!
>> >
>> > Ashish
>> >
>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> Yeah I see that now. I think it fails immediately because the map
>> >> operation does try to clean and/or verify the serialization of the
>> >> closure upfront.
>> >>
>> >> I'm not quite sure what is going on, but I think it's some strange
>> >> interaction between how you're building up the list and what the
>> >> resulting representation happens to be like, and how the closure
>> >> cleaner works, which can't be perfect. The shell also introduces an
>> >> extra layer of issues.
>> >>
>> >> For example, the slightly more canonical approaches work fine:
>> >>
>> >> import scala.collection.mutable.MutableList
>> >> val lst = MutableList[(String,String,Double)]()
>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>> >>
>> >> or just
>> >>
>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>> >>
>> >> If you just need this to work, maybe those are better alternatives
>> anyway.
>> >> You can also check whether it works without the shell, as I suspect
>> >> that's a factor.
>> >>
>> >> It's not an error in Spark per se but saying that something's default
>> >> Java serialization graph is very deep, so it's like the code you wrote
>> >> plus the closure cleaner ends up pulling in some huge linked list and
>> >> serializing it the direct and unuseful way.
>> >>
>> >> If you have an idea about exactly why it's happening you can open a
>> >> JIRA, but arguably it's something that's nice to just work but isn't
>> >> to do with Spark per se. Or, have a look at others related to the
>> >> closure and shell and you may find this is related to other known
>> >> behavior.
>> >>
>> >>
>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>> >> <as...@gmail.com> wrote:
>> >> > Sean .. does the code below work for you in the Spark shell? Ted got
>> the
>> >> > same error -
>> >> >
>> >> > val a=10
>> >> > val lst = MutableList[(String,String,Double)]()
>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >
>> >> > -Ashish
>> >> >
>> >> >
>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com>
>> wrote:
>> >> >>
>> >> >> I'm not sure how to reproduce it? this code does not produce an
>> error
>> >> >> in
>> >> >> master.
>> >> >>
>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>> >> >> <as...@gmail.com> wrote:
>> >> >> > Do you think I should create a JIRA?
>> >> >> >
>> >> >> >
>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com>
>> wrote:
>> >> >> >>
>> >> >> >> I got StackOverFlowError as well :-(
>> >> >> >>
>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>> >> >> >> <as...@gmail.com>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are
>> you
>> >> >> >>> able
>> >> >> >>> to replicate on your side?
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com>
>> >> >> >>> wrote:
>> >> >> >>>>
>> >> >> >>>> I see.
>> >> >> >>>>
>> >> >> >>>> What about using the following in place of variable a ?
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>>
>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>> >> >> >>>>
>> >> >> >>>> Cheers
>> >> >> >>>>
>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>> >> >> >>>> <as...@gmail.com> wrote:
>> >> >> >>>>>
>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
>> >> >> >>>>> stackoverflowerror, its very weird
>> >> >> >>>>>
>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
>> >> >> >>>>> happens
>> >> >> >>>>> when I try to pass a variable into the closure. The example
>> you
>> >> >> >>>>> have
>> >> >> >>>>> above
>> >> >> >>>>> works fine since there is no variable being passed into the
>> >> >> >>>>> closure
>> >> >> >>>>> from the
>> >> >> >>>>> shell.
>> >> >> >>>>>
>> >> >> >>>>> -Ashish
>> >> >> >>>>>
>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com>
>> >> >> >>>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>> Using Spark shell :
>> >> >> >>>>>>
>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
>> >> >> >>>>>> import scala.collection.mutable.MutableList
>> >> >> >>>>>>
>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
>> >> >> >>>>>> Double)]
>> >> >> >>>>>> =
>> >> >> >>>>>> MutableList()
>> >> >> >>>>>>
>> >> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >> >>>>>>
>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>> <console>:27: error: not found: value a
>> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>>                                           ^
>> >> >> >>>>>>
>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
>> map
>> >> >> >>>>>> at
>> >> >> >>>>>> <console>:27
>> >> >> >>>>>>
>> >> >> >>>>>> scala> rdd.count()
>> >> >> >>>>>> ...
>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>> >> >> >>>>>> <console>:30, took 0.478350 s
>> >> >> >>>>>> res1: Long = 10000
>> >> >> >>>>>>
>> >> >> >>>>>> Ashish:
>> >> >> >>>>>> Please refine your example to mimic more closely what your
>> code
>> >> >> >>>>>> actually did.
>> >> >> >>>>>>
>> >> >> >>>>>> Thanks
>> >> >> >>>>>>
>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
>> sowen@cloudera.com>
>> >> >> >>>>>> wrote:
>> >> >> >>>>>>>
>> >> >> >>>>>>> That can't cause any error, since there is no action in your
>> >> >> >>>>>>> first
>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
>> >> >> >>>>>>> error.
>> >> >> >>>>>>> You
>> >> >> >>>>>>> must be executing something different.
>> >> >> >>>>>>>
>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>> >> >> >>>>>>> <as...@gmail.com>
>> >> >> >>>>>>> wrote:
>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I
>> have
>> >> >> >>>>>>> > a
>> >> >> >>>>>>> > simple
>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in
>> it.
>> >> >> >>>>>>> > I
>> >> >> >>>>>>> > get
>> >> >> >>>>>>> > a
>> >> >> >>>>>>> > StackOverFlowError each time I try to run the following
>> code
>> >> >> >>>>>>> > (the
>> >> >> >>>>>>> > code
>> >> >> >>>>>>> > itself is just representative of other logic where I need
>> to
>> >> >> >>>>>>> > pass
>> >> >> >>>>>>> > in a
>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no
>> luck
>> >> >> >>>>>>> > ..
>> >> >> >>>>>>> > missing
>> >> >> >>>>>>> > something basic here -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>> >> >> >>>>>>> > val a=10
>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>> >> >> >>>>>>> > This throws -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > java.lang.StackOverflowError
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >> >>>>>>> > ...
>> >> >> >>>>>>> > ...
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > More experiments  .. this works -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > val lst =
>> Range(0,10000).map(i=>("10","10",i:Double)).toList
>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > Any help appreciated!
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > Thanks,
>> >> >> >>>>>>> > Ashish
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > --
>> >> >> >>>>>>> > View this message in context:
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive
>> at
>> >> >> >>>>>>> > Nabble.com.
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> ---------------------------------------------------------------------
>> >> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >> >>>>>>> > For additional commands, e-mail:
>> user-help@spark.apache.org
>> >> >> >>>>>>> >
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> ---------------------------------------------------------------------
>> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >> >>>>>>>
>> >> >> >>>>>>
>> >> >> >>>>
>> >> >> >>
>> >> >> >
>>
>

Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
bcItemsIdx is just a broadcast variable constructed out of Array[(String)]
.. it holds the item ids and I use it for indexing the MatrixEntry objects


On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:

> It's not clear; that error is different still and somehow suggests
> you're serializing a stream somewhere. I'd look at what's inside
> bcItemsIdx as that is not shown here.
>
> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
> <as...@gmail.com> wrote:
> > Sean,
> >
> > Thanks for your comments. What I was really trying to do was to
> transform a
> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
> column
> > similarity calculations while exploring the data before building some
> > models. But to do that I need to first convert the user and item ids into
> > respective indexes where I intended on passing in an array into the
> closure,
> > which is where I got stuck with this overflowerror trying to figure out
> > where it is happening. The actual error I got was slightly different
> (Caused
> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
> started
> > investigating this issue which led me to the earlier code snippet that I
> had
> > posted. This is again because of the bcItemsIdx variable being passed
> into
> > the closure. Below code works if I don't pass in the variable and use
> simply
> > a constant like 10 in its place .. The code thus far -
> >
> > // rdd below is RDD[(String,String,Double)]
> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of item
> > ids
> > val gRdd = rdd.map{case(user,item,rating) =>
> > ((user),(item,rating))}.groupByKey
> > val idxRdd = gRdd.zipWithIndex
> > val cm = new CoordinateMatrix(
> >     idxRdd.flatMap[MatrixEntry](e => {
> >         e._1._2.map(item=> {
> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
> > item._2) // <- This is where I get the Serialization error passing in the
> > index
> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
> >         })
> >     })
> > )
> > val rm = cm.toRowMatrix
> > val simMatrix = rm.columnSimilarities()
> >
> > I would like to make this work in the Spark shell as I am still exploring
> > the data. Let me know if there is an alternate way of constructing the
> > RowMatrix.
> >
> > Thanks and appreciate all the help!
> >
> > Ashish
> >
> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
> >>
> >> Yeah I see that now. I think it fails immediately because the map
> >> operation does try to clean and/or verify the serialization of the
> >> closure upfront.
> >>
> >> I'm not quite sure what is going on, but I think it's some strange
> >> interaction between how you're building up the list and what the
> >> resulting representation happens to be like, and how the closure
> >> cleaner works, which can't be perfect. The shell also introduces an
> >> extra layer of issues.
> >>
> >> For example, the slightly more canonical approaches work fine:
> >>
> >> import scala.collection.mutable.MutableList
> >> val lst = MutableList[(String,String,Double)]()
> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
> >>
> >> or just
> >>
> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
> >>
> >> If you just need this to work, maybe those are better alternatives
> anyway.
> >> You can also check whether it works without the shell, as I suspect
> >> that's a factor.
> >>
> >> It's not an error in Spark per se but saying that something's default
> >> Java serialization graph is very deep, so it's like the code you wrote
> >> plus the closure cleaner ends up pulling in some huge linked list and
> >> serializing it the direct and unuseful way.
> >>
> >> If you have an idea about exactly why it's happening you can open a
> >> JIRA, but arguably it's something that's nice to just work but isn't
> >> to do with Spark per se. Or, have a look at others related to the
> >> closure and shell and you may find this is related to other known
> >> behavior.
> >>
> >>
> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
> >> <as...@gmail.com> wrote:
> >> > Sean .. does the code below work for you in the Spark shell? Ted got
> the
> >> > same error -
> >> >
> >> > val a=10
> >> > val lst = MutableList[(String,String,Double)]()
> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >
> >> > -Ashish
> >> >
> >> >
> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote:
> >> >>
> >> >> I'm not sure how to reproduce it? this code does not produce an error
> >> >> in
> >> >> master.
> >> >>
> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
> >> >> <as...@gmail.com> wrote:
> >> >> > Do you think I should create a JIRA?
> >> >> >
> >> >> >
> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com>
> wrote:
> >> >> >>
> >> >> >> I got StackOverFlowError as well :-(
> >> >> >>
> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
> >> >> >> <as...@gmail.com>
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are
> you
> >> >> >>> able
> >> >> >>> to replicate on your side?
> >> >> >>>
> >> >> >>>
> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com>
> >> >> >>> wrote:
> >> >> >>>>
> >> >> >>>> I see.
> >> >> >>>>
> >> >> >>>> What about using the following in place of variable a ?
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
> >> >> >>>>
> >> >> >>>> Cheers
> >> >> >>>>
> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
> >> >> >>>> <as...@gmail.com> wrote:
> >> >> >>>>>
> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
> >> >> >>>>> stackoverflowerror, its very weird
> >> >> >>>>>
> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
> >> >> >>>>> happens
> >> >> >>>>> when I try to pass a variable into the closure. The example you
> >> >> >>>>> have
> >> >> >>>>> above
> >> >> >>>>> works fine since there is no variable being passed into the
> >> >> >>>>> closure
> >> >> >>>>> from the
> >> >> >>>>> shell.
> >> >> >>>>>
> >> >> >>>>> -Ashish
> >> >> >>>>>
> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com>
> >> >> >>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>> Using Spark shell :
> >> >> >>>>>>
> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
> >> >> >>>>>> import scala.collection.mutable.MutableList
> >> >> >>>>>>
> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
> >> >> >>>>>> Double)]
> >> >> >>>>>> =
> >> >> >>>>>> MutableList()
> >> >> >>>>>>
> >> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> >> >>>>>>
> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>> <console>:27: error: not found: value a
> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>>                                           ^
> >> >> >>>>>>
> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
> map
> >> >> >>>>>> at
> >> >> >>>>>> <console>:27
> >> >> >>>>>>
> >> >> >>>>>> scala> rdd.count()
> >> >> >>>>>> ...
> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
> >> >> >>>>>> <console>:30, took 0.478350 s
> >> >> >>>>>> res1: Long = 10000
> >> >> >>>>>>
> >> >> >>>>>> Ashish:
> >> >> >>>>>> Please refine your example to mimic more closely what your
> code
> >> >> >>>>>> actually did.
> >> >> >>>>>>
> >> >> >>>>>> Thanks
> >> >> >>>>>>
> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
> sowen@cloudera.com>
> >> >> >>>>>> wrote:
> >> >> >>>>>>>
> >> >> >>>>>>> That can't cause any error, since there is no action in your
> >> >> >>>>>>> first
> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
> >> >> >>>>>>> error.
> >> >> >>>>>>> You
> >> >> >>>>>>> must be executing something different.
> >> >> >>>>>>>
> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
> >> >> >>>>>>> <as...@gmail.com>
> >> >> >>>>>>> wrote:
> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I
> have
> >> >> >>>>>>> > a
> >> >> >>>>>>> > simple
> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in
> it.
> >> >> >>>>>>> > I
> >> >> >>>>>>> > get
> >> >> >>>>>>> > a
> >> >> >>>>>>> > StackOverFlowError each time I try to run the following
> code
> >> >> >>>>>>> > (the
> >> >> >>>>>>> > code
> >> >> >>>>>>> > itself is just representative of other logic where I need
> to
> >> >> >>>>>>> > pass
> >> >> >>>>>>> > in a
> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no
> luck
> >> >> >>>>>>> > ..
> >> >> >>>>>>> > missing
> >> >> >>>>>>> > something basic here -
> >> >> >>>>>>> >
> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
> >> >> >>>>>>> > val a=10
> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
> >> >> >>>>>>> > This throws -
> >> >> >>>>>>> >
> >> >> >>>>>>> > java.lang.StackOverflowError
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >> >> >>>>>>> > ...
> >> >> >>>>>>> > ...
> >> >> >>>>>>> >
> >> >> >>>>>>> > More experiments  .. this works -
> >> >> >>>>>>> >
> >> >> >>>>>>> > val lst =
> Range(0,10000).map(i=>("10","10",i:Double)).toList
> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>>> >
> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
> >> >> >>>>>>> >
> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>>> >
> >> >> >>>>>>> > Any help appreciated!
> >> >> >>>>>>> >
> >> >> >>>>>>> > Thanks,
> >> >> >>>>>>> > Ashish
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> > --
> >> >> >>>>>>> > View this message in context:
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive
> at
> >> >> >>>>>>> > Nabble.com.
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> ---------------------------------------------------------------------
> >> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> >> >>>>>>> > For additional commands, e-mail:
> user-help@spark.apache.org
> >> >> >>>>>>> >
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>>
> ---------------------------------------------------------------------
> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >> >>>>
> >> >> >>
> >> >> >
>

Re: Spark shell and StackOverFlowError

Posted by Sean Owen <so...@cloudera.com>.
It's not clear; that error is different still and somehow suggests
you're serializing a stream somewhere. I'd look at what's inside
bcItemsIdx as that is not shown here.

On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
<as...@gmail.com> wrote:
> Sean,
>
> Thanks for your comments. What I was really trying to do was to transform a
> RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some column
> similarity calculations while exploring the data before building some
> models. But to do that I need to first convert the user and item ids into
> respective indexes where I intended on passing in an array into the closure,
> which is where I got stuck with this overflowerror trying to figure out
> where it is happening. The actual error I got was slightly different (Caused
> by: java.io.NotSerializableException: java.io.ObjectInputStream). I started
> investigating this issue which led me to the earlier code snippet that I had
> posted. This is again because of the bcItemsIdx variable being passed into
> the closure. Below code works if I don't pass in the variable and use simply
> a constant like 10 in its place .. The code thus far -
>
> // rdd below is RDD[(String,String,Double)]
> // bcItemsIdx below is Broadcast[Array[String]] which is an array of item
> ids
> val gRdd = rdd.map{case(user,item,rating) =>
> ((user),(item,rating))}.groupByKey
> val idxRdd = gRdd.zipWithIndex
> val cm = new CoordinateMatrix(
>     idxRdd.flatMap[MatrixEntry](e => {
>         e._1._2.map(item=> {
>                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
> item._2) // <- This is where I get the Serialization error passing in the
> index
>                  // MatrixEntry(e._2, 10, item._2) // <- This works
>         })
>     })
> )
> val rm = cm.toRowMatrix
> val simMatrix = rm.columnSimilarities()
>
> I would like to make this work in the Spark shell as I am still exploring
> the data. Let me know if there is an alternate way of constructing the
> RowMatrix.
>
> Thanks and appreciate all the help!
>
> Ashish
>
> On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
>>
>> Yeah I see that now. I think it fails immediately because the map
>> operation does try to clean and/or verify the serialization of the
>> closure upfront.
>>
>> I'm not quite sure what is going on, but I think it's some strange
>> interaction between how you're building up the list and what the
>> resulting representation happens to be like, and how the closure
>> cleaner works, which can't be perfect. The shell also introduces an
>> extra layer of issues.
>>
>> For example, the slightly more canonical approaches work fine:
>>
>> import scala.collection.mutable.MutableList
>> val lst = MutableList[(String,String,Double)]()
>> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>>
>> or just
>>
>> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>>
>> If you just need this to work, maybe those are better alternatives anyway.
>> You can also check whether it works without the shell, as I suspect
>> that's a factor.
>>
>> It's not an error in Spark per se but saying that something's default
>> Java serialization graph is very deep, so it's like the code you wrote
>> plus the closure cleaner ends up pulling in some huge linked list and
>> serializing it the direct and unuseful way.
>>
>> If you have an idea about exactly why it's happening you can open a
>> JIRA, but arguably it's something that's nice to just work but isn't
>> to do with Spark per se. Or, have a look at others related to the
>> closure and shell and you may find this is related to other known
>> behavior.
>>
>>
>> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>> <as...@gmail.com> wrote:
>> > Sean .. does the code below work for you in the Spark shell? Ted got the
>> > same error -
>> >
>> > val a=10
>> > val lst = MutableList[(String,String,Double)]()
>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >
>> > -Ashish
>> >
>> >
>> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> I'm not sure how to reproduce it? this code does not produce an error
>> >> in
>> >> master.
>> >>
>> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>> >> <as...@gmail.com> wrote:
>> >> > Do you think I should create a JIRA?
>> >> >
>> >> >
>> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com> wrote:
>> >> >>
>> >> >> I got StackOverFlowError as well :-(
>> >> >>
>> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>> >> >> <as...@gmail.com>
>> >> >> wrote:
>> >> >>>
>> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are you
>> >> >>> able
>> >> >>> to replicate on your side?
>> >> >>>
>> >> >>>
>> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com>
>> >> >>> wrote:
>> >> >>>>
>> >> >>>> I see.
>> >> >>>>
>> >> >>>> What about using the following in place of variable a ?
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>> >> >>>>
>> >> >>>> Cheers
>> >> >>>>
>> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>> >> >>>> <as...@gmail.com> wrote:
>> >> >>>>>
>> >> >>>>> @Sean - Agree that there is no action, but I still get the
>> >> >>>>> stackoverflowerror, its very weird
>> >> >>>>>
>> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
>> >> >>>>> happens
>> >> >>>>> when I try to pass a variable into the closure. The example you
>> >> >>>>> have
>> >> >>>>> above
>> >> >>>>> works fine since there is no variable being passed into the
>> >> >>>>> closure
>> >> >>>>> from the
>> >> >>>>> shell.
>> >> >>>>>
>> >> >>>>> -Ashish
>> >> >>>>>
>> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com>
>> >> >>>>> wrote:
>> >> >>>>>>
>> >> >>>>>> Using Spark shell :
>> >> >>>>>>
>> >> >>>>>> scala> import scala.collection.mutable.MutableList
>> >> >>>>>> import scala.collection.mutable.MutableList
>> >> >>>>>>
>> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
>> >> >>>>>> Double)]
>> >> >>>>>> =
>> >> >>>>>> MutableList()
>> >> >>>>>>
>> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >>>>>>
>> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >>>>>> <console>:27: error: not found: value a
>> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >>>>>>                                           ^
>> >> >>>>>>
>> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map
>> >> >>>>>> at
>> >> >>>>>> <console>:27
>> >> >>>>>>
>> >> >>>>>> scala> rdd.count()
>> >> >>>>>> ...
>> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>> >> >>>>>> <console>:30, took 0.478350 s
>> >> >>>>>> res1: Long = 10000
>> >> >>>>>>
>> >> >>>>>> Ashish:
>> >> >>>>>> Please refine your example to mimic more closely what your code
>> >> >>>>>> actually did.
>> >> >>>>>>
>> >> >>>>>> Thanks
>> >> >>>>>>
>> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com>
>> >> >>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>> That can't cause any error, since there is no action in your
>> >> >>>>>>> first
>> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
>> >> >>>>>>> error.
>> >> >>>>>>> You
>> >> >>>>>>> must be executing something different.
>> >> >>>>>>>
>> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>> >> >>>>>>> <as...@gmail.com>
>> >> >>>>>>> wrote:
>> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I have
>> >> >>>>>>> > a
>> >> >>>>>>> > simple
>> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it.
>> >> >>>>>>> > I
>> >> >>>>>>> > get
>> >> >>>>>>> > a
>> >> >>>>>>> > StackOverFlowError each time I try to run the following code
>> >> >>>>>>> > (the
>> >> >>>>>>> > code
>> >> >>>>>>> > itself is just representative of other logic where I need to
>> >> >>>>>>> > pass
>> >> >>>>>>> > in a
>> >> >>>>>>> > variable). I tried broadcasting the variable too, but no luck
>> >> >>>>>>> > ..
>> >> >>>>>>> > missing
>> >> >>>>>>> > something basic here -
>> >> >>>>>>> >
>> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>> >> >>>>>>> > val a=10
>> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>> >> >>>>>>> > This throws -
>> >> >>>>>>> >
>> >> >>>>>>> > java.lang.StackOverflowError
>> >> >>>>>>> >     at
>> >> >>>>>>> > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >>>>>>> >     at
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >>>>>>> > ...
>> >> >>>>>>> > ...
>> >> >>>>>>> >
>> >> >>>>>>> > More experiments  .. this works -
>> >> >>>>>>> >
>> >> >>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >>>>>>> >
>> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>> >> >>>>>>> >
>> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >>>>>>> >
>> >> >>>>>>> > Any help appreciated!
>> >> >>>>>>> >
>> >> >>>>>>> > Thanks,
>> >> >>>>>>> > Ashish
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > --
>> >> >>>>>>> > View this message in context:
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive at
>> >> >>>>>>> > Nabble.com.
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> >
>> >> >>>>>>> > ---------------------------------------------------------------------
>> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>>>> >
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>>
>> >> >>>>>>> ---------------------------------------------------------------------
>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >> >>>>>>>
>> >> >>>>>>
>> >> >>>>
>> >> >>
>> >> >

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
Sean,

Thanks for your comments. What I was really trying to do was to transform a
RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some column
similarity calculations while exploring the data before building some
models. But to do that I need to first convert the user and item ids into
respective indexes where I intended on passing in an array into the
closure, which is where I got stuck with this overflowerror trying to
figure out where it is happening. The actual error I got was slightly
different (Caused by: java.io.NotSerializableException:
java.io.ObjectInputStream). I started investigating this issue which led me
to the earlier code snippet that I had posted. This is again because of the
bcItemsIdx variable being passed into the closure. Below code works if I
don't pass in the variable and use simply a constant like 10 in its place
.. The code thus far -

// rdd below is RDD[(String,String,Double)]
// bcItemsIdx below is Broadcast[Array[String]] which is an array of item
ids
val gRdd = rdd.map{case(user,item,rating) =>
((user),(item,rating))}.groupByKey
val idxRdd = gRdd.zipWithIndex
val cm = new CoordinateMatrix(
    idxRdd.flatMap[MatrixEntry](e => {
        e._1._2.map(item=> {
                 MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
item._2) // <- This is where I get the Serialization error passing in the
index
                 // MatrixEntry(e._2, 10, item._2) // <- This works
        })
    })
)
val rm = cm.toRowMatrix
val simMatrix = rm.columnSimilarities()

I would like to make this work in the Spark shell as I am still exploring
the data. Let me know if there is an alternate way of constructing the
RowMatrix.

Thanks and appreciate all the help!

Ashish

On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:

> Yeah I see that now. I think it fails immediately because the map
> operation does try to clean and/or verify the serialization of the
> closure upfront.
>
> I'm not quite sure what is going on, but I think it's some strange
> interaction between how you're building up the list and what the
> resulting representation happens to be like, and how the closure
> cleaner works, which can't be perfect. The shell also introduces an
> extra layer of issues.
>
> For example, the slightly more canonical approaches work fine:
>
> import scala.collection.mutable.MutableList
> val lst = MutableList[(String,String,Double)]()
> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>
> or just
>
> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>
> If you just need this to work, maybe those are better alternatives anyway.
> You can also check whether it works without the shell, as I suspect
> that's a factor.
>
> It's not an error in Spark per se but saying that something's default
> Java serialization graph is very deep, so it's like the code you wrote
> plus the closure cleaner ends up pulling in some huge linked list and
> serializing it the direct and unuseful way.
>
> If you have an idea about exactly why it's happening you can open a
> JIRA, but arguably it's something that's nice to just work but isn't
> to do with Spark per se. Or, have a look at others related to the
> closure and shell and you may find this is related to other known
> behavior.
>
>
> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
> <as...@gmail.com> wrote:
> > Sean .. does the code below work for you in the Spark shell? Ted got the
> > same error -
> >
> > val a=10
> > val lst = MutableList[(String,String,Double)]()
> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >
> > -Ashish
> >
> >
> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote:
> >>
> >> I'm not sure how to reproduce it? this code does not produce an error in
> >> master.
> >>
> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
> >> <as...@gmail.com> wrote:
> >> > Do you think I should create a JIRA?
> >> >
> >> >
> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com> wrote:
> >> >>
> >> >> I got StackOverFlowError as well :-(
> >> >>
> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
> >> >> <as...@gmail.com>
> >> >> wrote:
> >> >>>
> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are you
> >> >>> able
> >> >>> to replicate on your side?
> >> >>>
> >> >>>
> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com>
> wrote:
> >> >>>>
> >> >>>> I see.
> >> >>>>
> >> >>>> What about using the following in place of variable a ?
> >> >>>>
> >> >>>>
> >> >>>>
> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
> >> >>>>
> >> >>>> Cheers
> >> >>>>
> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
> >> >>>> <as...@gmail.com> wrote:
> >> >>>>>
> >> >>>>> @Sean - Agree that there is no action, but I still get the
> >> >>>>> stackoverflowerror, its very weird
> >> >>>>>
> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
> happens
> >> >>>>> when I try to pass a variable into the closure. The example you
> have
> >> >>>>> above
> >> >>>>> works fine since there is no variable being passed into the
> closure
> >> >>>>> from the
> >> >>>>> shell.
> >> >>>>>
> >> >>>>> -Ashish
> >> >>>>>
> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com>
> wrote:
> >> >>>>>>
> >> >>>>>> Using Spark shell :
> >> >>>>>>
> >> >>>>>> scala> import scala.collection.mutable.MutableList
> >> >>>>>> import scala.collection.mutable.MutableList
> >> >>>>>>
> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
> Double)]
> >> >>>>>> =
> >> >>>>>> MutableList()
> >> >>>>>>
> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> >>>>>>
> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >>>>>> <console>:27: error: not found: value a
> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >>>>>>                                           ^
> >> >>>>>>
> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map
> at
> >> >>>>>> <console>:27
> >> >>>>>>
> >> >>>>>> scala> rdd.count()
> >> >>>>>> ...
> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
> >> >>>>>> <console>:30, took 0.478350 s
> >> >>>>>> res1: Long = 10000
> >> >>>>>>
> >> >>>>>> Ashish:
> >> >>>>>> Please refine your example to mimic more closely what your code
> >> >>>>>> actually did.
> >> >>>>>>
> >> >>>>>> Thanks
> >> >>>>>>
> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com>
> >> >>>>>> wrote:
> >> >>>>>>>
> >> >>>>>>> That can't cause any error, since there is no action in your
> first
> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
> error.
> >> >>>>>>> You
> >> >>>>>>> must be executing something different.
> >> >>>>>>>
> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
> >> >>>>>>> <as...@gmail.com>
> >> >>>>>>> wrote:
> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I have
> a
> >> >>>>>>> > simple
> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I
> >> >>>>>>> > get
> >> >>>>>>> > a
> >> >>>>>>> > StackOverFlowError each time I try to run the following code
> >> >>>>>>> > (the
> >> >>>>>>> > code
> >> >>>>>>> > itself is just representative of other logic where I need to
> >> >>>>>>> > pass
> >> >>>>>>> > in a
> >> >>>>>>> > variable). I tried broadcasting the variable too, but no luck
> ..
> >> >>>>>>> > missing
> >> >>>>>>> > something basic here -
> >> >>>>>>> >
> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
> >> >>>>>>> > val a=10
> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
> >> >>>>>>> > This throws -
> >> >>>>>>> >
> >> >>>>>>> > java.lang.StackOverflowError
> >> >>>>>>> >     at
> >> >>>>>>> > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >> >>>>>>> >     at
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >> >>>>>>> > ...
> >> >>>>>>> > ...
> >> >>>>>>> >
> >> >>>>>>> > More experiments  .. this works -
> >> >>>>>>> >
> >> >>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >>>>>>> >
> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
> >> >>>>>>> >
> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >>>>>>> >
> >> >>>>>>> > Any help appreciated!
> >> >>>>>>> >
> >> >>>>>>> > Thanks,
> >> >>>>>>> > Ashish
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> > --
> >> >>>>>>> > View this message in context:
> >> >>>>>>> >
> >> >>>>>>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive at
> >> >>>>>>> > Nabble.com.
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> >> >>>>>>> >
> ---------------------------------------------------------------------
> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> >>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
> >> >>>>>>> >
> >> >>>>>>>
> >> >>>>>>>
> >> >>>>>>>
> ---------------------------------------------------------------------
> >> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
> >> >>>>>>>
> >> >>>>>>
> >> >>>>
> >> >>
> >> >
>

Re: Spark shell and StackOverFlowError

Posted by Sean Owen <so...@cloudera.com>.
Yeah I see that now. I think it fails immediately because the map
operation does try to clean and/or verify the serialization of the
closure upfront.

I'm not quite sure what is going on, but I think it's some strange
interaction between how you're building up the list and what the
resulting representation happens to be like, and how the closure
cleaner works, which can't be perfect. The shell also introduces an
extra layer of issues.

For example, the slightly more canonical approaches work fine:

import scala.collection.mutable.MutableList
val lst = MutableList[(String,String,Double)]()
(0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))

or just

val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))

If you just need this to work, maybe those are better alternatives anyway.
You can also check whether it works without the shell, as I suspect
that's a factor.

It's not an error in Spark per se but saying that something's default
Java serialization graph is very deep, so it's like the code you wrote
plus the closure cleaner ends up pulling in some huge linked list and
serializing it the direct and unuseful way.

If you have an idea about exactly why it's happening you can open a
JIRA, but arguably it's something that's nice to just work but isn't
to do with Spark per se. Or, have a look at others related to the
closure and shell and you may find this is related to other known
behavior.


On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
<as...@gmail.com> wrote:
> Sean .. does the code below work for you in the Spark shell? Ted got the
> same error -
>
> val a=10
> val lst = MutableList[(String,String,Double)]()
> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>
> -Ashish
>
>
> On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote:
>>
>> I'm not sure how to reproduce it? this code does not produce an error in
>> master.
>>
>> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>> <as...@gmail.com> wrote:
>> > Do you think I should create a JIRA?
>> >
>> >
>> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com> wrote:
>> >>
>> >> I got StackOverFlowError as well :-(
>> >>
>> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>> >> <as...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are you
>> >>> able
>> >>> to replicate on your side?
>> >>>
>> >>>
>> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com> wrote:
>> >>>>
>> >>>> I see.
>> >>>>
>> >>>> What about using the following in place of variable a ?
>> >>>>
>> >>>>
>> >>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>> >>>>
>> >>>> Cheers
>> >>>>
>> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>> >>>> <as...@gmail.com> wrote:
>> >>>>>
>> >>>>> @Sean - Agree that there is no action, but I still get the
>> >>>>> stackoverflowerror, its very weird
>> >>>>>
>> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error happens
>> >>>>> when I try to pass a variable into the closure. The example you have
>> >>>>> above
>> >>>>> works fine since there is no variable being passed into the closure
>> >>>>> from the
>> >>>>> shell.
>> >>>>>
>> >>>>> -Ashish
>> >>>>>
>> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
>> >>>>>>
>> >>>>>> Using Spark shell :
>> >>>>>>
>> >>>>>> scala> import scala.collection.mutable.MutableList
>> >>>>>> import scala.collection.mutable.MutableList
>> >>>>>>
>> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>> >>>>>> lst: scala.collection.mutable.MutableList[(String, String, Double)]
>> >>>>>> =
>> >>>>>> MutableList()
>> >>>>>>
>> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >>>>>>
>> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >>>>>> <console>:27: error: not found: value a
>> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >>>>>>                                           ^
>> >>>>>>
>> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
>> >>>>>> <console>:27
>> >>>>>>
>> >>>>>> scala> rdd.count()
>> >>>>>> ...
>> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>> >>>>>> <console>:30, took 0.478350 s
>> >>>>>> res1: Long = 10000
>> >>>>>>
>> >>>>>> Ashish:
>> >>>>>> Please refine your example to mimic more closely what your code
>> >>>>>> actually did.
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>>
>> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com>
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>> That can't cause any error, since there is no action in your first
>> >>>>>>> snippet. Even calling count on the result doesn't cause an error.
>> >>>>>>> You
>> >>>>>>> must be executing something different.
>> >>>>>>>
>> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>> >>>>>>> <as...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I have a
>> >>>>>>> > simple
>> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I
>> >>>>>>> > get
>> >>>>>>> > a
>> >>>>>>> > StackOverFlowError each time I try to run the following code
>> >>>>>>> > (the
>> >>>>>>> > code
>> >>>>>>> > itself is just representative of other logic where I need to
>> >>>>>>> > pass
>> >>>>>>> > in a
>> >>>>>>> > variable). I tried broadcasting the variable too, but no luck ..
>> >>>>>>> > missing
>> >>>>>>> > something basic here -
>> >>>>>>> >
>> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>> >>>>>>> > val a=10
>> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>> >>>>>>> > This throws -
>> >>>>>>> >
>> >>>>>>> > java.lang.StackOverflowError
>> >>>>>>> >     at
>> >>>>>>> > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >>>>>>> >     at
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >>>>>>> > ...
>> >>>>>>> > ...
>> >>>>>>> >
>> >>>>>>> > More experiments  .. this works -
>> >>>>>>> >
>> >>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >>>>>>> >
>> >>>>>>> > But below doesn't and throws the StackoverflowError -
>> >>>>>>> >
>> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >>>>>>> >
>> >>>>>>> > Any help appreciated!
>> >>>>>>> >
>> >>>>>>> > Thanks,
>> >>>>>>> > Ashish
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > --
>> >>>>>>> > View this message in context:
>> >>>>>>> >
>> >>>>>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>> >>>>>>> > Sent from the Apache Spark User List mailing list archive at
>> >>>>>>> > Nabble.com.
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > ---------------------------------------------------------------------
>> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>> >>>>>>> >
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> ---------------------------------------------------------------------
>> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>> >>>>>>>
>> >>>>>>
>> >>>>
>> >>
>> >

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
Sean .. does the code below work for you in the Spark shell? Ted got the
same error -

val a=10
val lst = MutableList[(String,String,Double)]()
Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)

-Ashish


On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote:

> I'm not sure how to reproduce it? this code does not produce an error in
> master.
>
> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
> <as...@gmail.com> wrote:
> > Do you think I should create a JIRA?
> >
> >
> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com> wrote:
> >>
> >> I got StackOverFlowError as well :-(
> >>
> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty <
> ashish.shrowty@gmail.com>
> >> wrote:
> >>>
> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are you
> able
> >>> to replicate on your side?
> >>>
> >>>
> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com> wrote:
> >>>>
> >>>> I see.
> >>>>
> >>>> What about using the following in place of variable a ?
> >>>>
> >>>>
> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
> >>>>
> >>>> Cheers
> >>>>
> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
> >>>> <as...@gmail.com> wrote:
> >>>>>
> >>>>> @Sean - Agree that there is no action, but I still get the
> >>>>> stackoverflowerror, its very weird
> >>>>>
> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error happens
> >>>>> when I try to pass a variable into the closure. The example you have
> above
> >>>>> works fine since there is no variable being passed into the closure
> from the
> >>>>> shell.
> >>>>>
> >>>>> -Ashish
> >>>>>
> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
> >>>>>>
> >>>>>> Using Spark shell :
> >>>>>>
> >>>>>> scala> import scala.collection.mutable.MutableList
> >>>>>> import scala.collection.mutable.MutableList
> >>>>>>
> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
> >>>>>> lst: scala.collection.mutable.MutableList[(String, String, Double)]
> =
> >>>>>> MutableList()
> >>>>>>
> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >>>>>>
> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >>>>>> <console>:27: error: not found: value a
> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >>>>>>                                           ^
> >>>>>>
> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
> >>>>>> <console>:27
> >>>>>>
> >>>>>> scala> rdd.count()
> >>>>>> ...
> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
> >>>>>> <console>:30, took 0.478350 s
> >>>>>> res1: Long = 10000
> >>>>>>
> >>>>>> Ashish:
> >>>>>> Please refine your example to mimic more closely what your code
> >>>>>> actually did.
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> That can't cause any error, since there is no action in your first
> >>>>>>> snippet. Even calling count on the result doesn't cause an error.
> You
> >>>>>>> must be executing something different.
> >>>>>>>
> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <
> ashish.shrowty@gmail.com>
> >>>>>>> wrote:
> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I have a
> >>>>>>> > simple
> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I
> get
> >>>>>>> > a
> >>>>>>> > StackOverFlowError each time I try to run the following code (the
> >>>>>>> > code
> >>>>>>> > itself is just representative of other logic where I need to pass
> >>>>>>> > in a
> >>>>>>> > variable). I tried broadcasting the variable too, but no luck ..
> >>>>>>> > missing
> >>>>>>> > something basic here -
> >>>>>>> >
> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
> >>>>>>> > val a=10
> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
> >>>>>>> > This throws -
> >>>>>>> >
> >>>>>>> > java.lang.StackOverflowError
> >>>>>>> >     at
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
> >>>>>>> >     at
> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
> >>>>>>> >     at
> >>>>>>> >
> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >>>>>>> >     at
> >>>>>>> >
> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >>>>>>> >     at
> >>>>>>> >
> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >>>>>>> >     at
> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> >>>>>>> >     at
> >>>>>>> >
> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >>>>>>> >     at
> >>>>>>> >
> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >>>>>>> >     at
> >>>>>>> >
> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >>>>>>> > ...
> >>>>>>> > ...
> >>>>>>> >
> >>>>>>> > More experiments  .. this works -
> >>>>>>> >
> >>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >>>>>>> >
> >>>>>>> > But below doesn't and throws the StackoverflowError -
> >>>>>>> >
> >>>>>>> > val lst = MutableList[(String,String,Double)]()
> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >>>>>>> >
> >>>>>>> > Any help appreciated!
> >>>>>>> >
> >>>>>>> > Thanks,
> >>>>>>> > Ashish
> >>>>>>> >
> >>>>>>> >
> >>>>>>> >
> >>>>>>> > --
> >>>>>>> > View this message in context:
> >>>>>>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
> >>>>>>> > Sent from the Apache Spark User List mailing list archive at
> >>>>>>> > Nabble.com.
> >>>>>>> >
> >>>>>>> >
> >>>>>>> >
> ---------------------------------------------------------------------
> >>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
> >>>>>>> >
> >>>>>>>
> >>>>>>>
> ---------------------------------------------------------------------
> >>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>>>>>> For additional commands, e-mail: user-help@spark.apache.org
> >>>>>>>
> >>>>>>
> >>>>
> >>
> >
>

Re: Spark shell and StackOverFlowError

Posted by Sean Owen <so...@cloudera.com>.
I'm not sure how to reproduce it? this code does not produce an error in master.

On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
<as...@gmail.com> wrote:
> Do you think I should create a JIRA?
>
>
> On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com> wrote:
>>
>> I got StackOverFlowError as well :-(
>>
>> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty <as...@gmail.com>
>> wrote:
>>>
>>> Yep .. I tried that too earlier. Doesn't make a difference. Are you able
>>> to replicate on your side?
>>>
>>>
>>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>> I see.
>>>>
>>>> What about using the following in place of variable a ?
>>>>
>>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>>>
>>>> Cheers
>>>>
>>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>>>> <as...@gmail.com> wrote:
>>>>>
>>>>> @Sean - Agree that there is no action, but I still get the
>>>>> stackoverflowerror, its very weird
>>>>>
>>>>> @Ted - Variable a is just an int - val a = 10 ... The error happens
>>>>> when I try to pass a variable into the closure. The example you have above
>>>>> works fine since there is no variable being passed into the closure from the
>>>>> shell.
>>>>>
>>>>> -Ashish
>>>>>
>>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
>>>>>>
>>>>>> Using Spark shell :
>>>>>>
>>>>>> scala> import scala.collection.mutable.MutableList
>>>>>> import scala.collection.mutable.MutableList
>>>>>>
>>>>>> scala> val lst = MutableList[(String,String,Double)]()
>>>>>> lst: scala.collection.mutable.MutableList[(String, String, Double)] =
>>>>>> MutableList()
>>>>>>
>>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>>
>>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>> <console>:27: error: not found: value a
>>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>>                                           ^
>>>>>>
>>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
>>>>>> <console>:27
>>>>>>
>>>>>> scala> rdd.count()
>>>>>> ...
>>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>>>>>> <console>:30, took 0.478350 s
>>>>>> res1: Long = 10000
>>>>>>
>>>>>> Ashish:
>>>>>> Please refine your example to mimic more closely what your code
>>>>>> actually did.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> That can't cause any error, since there is no action in your first
>>>>>>> snippet. Even calling count on the result doesn't cause an error. You
>>>>>>> must be executing something different.
>>>>>>>
>>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
>>>>>>> wrote:
>>>>>>> > I am running the Spark shell (1.2.1) in local mode and I have a
>>>>>>> > simple
>>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I get
>>>>>>> > a
>>>>>>> > StackOverFlowError each time I try to run the following code (the
>>>>>>> > code
>>>>>>> > itself is just representative of other logic where I need to pass
>>>>>>> > in a
>>>>>>> > variable). I tried broadcasting the variable too, but no luck ..
>>>>>>> > missing
>>>>>>> > something basic here -
>>>>>>> >
>>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>>>>> > val a=10
>>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>>>>> > This throws -
>>>>>>> >
>>>>>>> > java.lang.StackOverflowError
>>>>>>> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>>>>> >     at
>>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>>>>> >     at
>>>>>>> >
>>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>> >     at
>>>>>>> >
>>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>> >     at
>>>>>>> >
>>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>> >     at
>>>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>> >     at
>>>>>>> >
>>>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>> >     at
>>>>>>> >
>>>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>> >     at
>>>>>>> >
>>>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>> > ...
>>>>>>> > ...
>>>>>>> >
>>>>>>> > More experiments  .. this works -
>>>>>>> >
>>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>>> >
>>>>>>> > But below doesn't and throws the StackoverflowError -
>>>>>>> >
>>>>>>> > val lst = MutableList[(String,String,Double)]()
>>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>>> >
>>>>>>> > Any help appreciated!
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Ashish
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > View this message in context:
>>>>>>> > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>>>>> > Sent from the Apache Spark User List mailing list archive at
>>>>>>> > Nabble.com.
>>>>>>> >
>>>>>>> >
>>>>>>> > ---------------------------------------------------------------------
>>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>>>>>>> >
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>
>>>>>>
>>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
Do you think I should create a JIRA?


On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yu...@gmail.com> wrote:

> I got StackOverFlowError as well :-(
>
> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty <as...@gmail.com>
> wrote:
>
>> Yep .. I tried that too earlier. Doesn't make a difference. Are you able
>> to replicate on your side?
>>
>>
>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com> wrote:
>>
>>> I see.
>>>
>>> What about using the following in place of variable a ?
>>>
>>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>>
>>> Cheers
>>>
>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty <
>>> ashish.shrowty@gmail.com> wrote:
>>>
>>>> @Sean - Agree that there is no action, but I still get the
>>>> stackoverflowerror, its very weird
>>>>
>>>> @Ted - Variable a is just an int - val a = 10 ... The error happens
>>>> when I try to pass a variable into the closure. The example you have above
>>>> works fine since there is no variable being passed into the closure from
>>>> the shell.
>>>>
>>>> -Ashish
>>>>
>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
>>>>
>>>>> Using Spark shell :
>>>>>
>>>>> scala> import scala.collection.mutable.MutableList
>>>>> import scala.collection.mutable.MutableList
>>>>>
>>>>> scala> val lst = MutableList[(String,String,Double)]()
>>>>> lst: scala.collection.mutable.MutableList[(String, String, Double)] =
>>>>> MutableList()
>>>>>
>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>
>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> <console>:27: error: not found: value a
>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>                                           ^
>>>>>
>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
>>>>> <console>:27
>>>>>
>>>>> scala> rdd.count()
>>>>> ...
>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>>>>> <console>:30, took 0.478350 s
>>>>> res1: Long = 10000
>>>>>
>>>>> Ashish:
>>>>> Please refine your example to mimic more closely what your code
>>>>> actually did.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> That can't cause any error, since there is no action in your first
>>>>>> snippet. Even calling count on the result doesn't cause an error. You
>>>>>> must be executing something different.
>>>>>>
>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
>>>>>> wrote:
>>>>>> > I am running the Spark shell (1.2.1) in local mode and I have a
>>>>>> simple
>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I get a
>>>>>> > StackOverFlowError each time I try to run the following code (the
>>>>>> code
>>>>>> > itself is just representative of other logic where I need to pass
>>>>>> in a
>>>>>> > variable). I tried broadcasting the variable too, but no luck ..
>>>>>> missing
>>>>>> > something basic here -
>>>>>> >
>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>>>> > val a=10
>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>>>> > This throws -
>>>>>> >
>>>>>> > java.lang.StackOverflowError
>>>>>> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>>>> >     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>>>> >     at
>>>>>> >
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>> >     at
>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>> >     at
>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>> >     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>> >     at
>>>>>> >
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>> >     at
>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>> >     at
>>>>>> >
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>> > ...
>>>>>> > ...
>>>>>> >
>>>>>> > More experiments  .. this works -
>>>>>> >
>>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>> >
>>>>>> > But below doesn't and throws the StackoverflowError -
>>>>>> >
>>>>>> > val lst = MutableList[(String,String,Double)]()
>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>>> >
>>>>>> > Any help appreciated!
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Ashish
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > --
>>>>>> > View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>>>> > Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>> >
>>>>>> >
>>>>>> ---------------------------------------------------------------------
>>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>>>>>> >
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>
>

Re: Spark shell and StackOverFlowError

Posted by Ted Yu <yu...@gmail.com>.
I got StackOverFlowError as well :-(

On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty <as...@gmail.com>
wrote:

> Yep .. I tried that too earlier. Doesn't make a difference. Are you able
> to replicate on your side?
>
>
> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com> wrote:
>
>> I see.
>>
>> What about using the following in place of variable a ?
>>
>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>>
>> Cheers
>>
>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty <ashish.shrowty@gmail.com
>> > wrote:
>>
>>> @Sean - Agree that there is no action, but I still get the
>>> stackoverflowerror, its very weird
>>>
>>> @Ted - Variable a is just an int - val a = 10 ... The error happens
>>> when I try to pass a variable into the closure. The example you have above
>>> works fine since there is no variable being passed into the closure from
>>> the shell.
>>>
>>> -Ashish
>>>
>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
>>>
>>>> Using Spark shell :
>>>>
>>>> scala> import scala.collection.mutable.MutableList
>>>> import scala.collection.mutable.MutableList
>>>>
>>>> scala> val lst = MutableList[(String,String,Double)]()
>>>> lst: scala.collection.mutable.MutableList[(String, String, Double)] =
>>>> MutableList()
>>>>
>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>
>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> <console>:27: error: not found: value a
>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>                                           ^
>>>>
>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
>>>> <console>:27
>>>>
>>>> scala> rdd.count()
>>>> ...
>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>>>> <console>:30, took 0.478350 s
>>>> res1: Long = 10000
>>>>
>>>> Ashish:
>>>> Please refine your example to mimic more closely what your code
>>>> actually did.
>>>>
>>>> Thanks
>>>>
>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com> wrote:
>>>>
>>>>> That can't cause any error, since there is no action in your first
>>>>> snippet. Even calling count on the result doesn't cause an error. You
>>>>> must be executing something different.
>>>>>
>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
>>>>> wrote:
>>>>> > I am running the Spark shell (1.2.1) in local mode and I have a
>>>>> simple
>>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I get a
>>>>> > StackOverFlowError each time I try to run the following code (the
>>>>> code
>>>>> > itself is just representative of other logic where I need to pass in
>>>>> a
>>>>> > variable). I tried broadcasting the variable too, but no luck ..
>>>>> missing
>>>>> > something basic here -
>>>>> >
>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>>> > val a=10
>>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>>> > This throws -
>>>>> >
>>>>> > java.lang.StackOverflowError
>>>>> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>>> >     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>>> >     at
>>>>> >
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>> >     at
>>>>> >
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>> >     at
>>>>> >
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>> >     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>> >     at
>>>>> >
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>> >     at
>>>>> >
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>> >     at
>>>>> >
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>> > ...
>>>>> > ...
>>>>> >
>>>>> > More experiments  .. this works -
>>>>> >
>>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >
>>>>> > But below doesn't and throws the StackoverflowError -
>>>>> >
>>>>> > val lst = MutableList[(String,String,Double)]()
>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>>> >
>>>>> > Any help appreciated!
>>>>> >
>>>>> > Thanks,
>>>>> > Ashish
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>>> > Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>> >
>>>>> > ---------------------------------------------------------------------
>>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> > For additional commands, e-mail: user-help@spark.apache.org
>>>>> >
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>

Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
Yep .. I tried that too earlier. Doesn't make a difference. Are you able to
replicate on your side?


On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yu...@gmail.com> wrote:

> I see.
>
> What about using the following in place of variable a ?
>
> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>
> Cheers
>
> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty <as...@gmail.com>
> wrote:
>
>> @Sean - Agree that there is no action, but I still get the
>> stackoverflowerror, its very weird
>>
>> @Ted - Variable a is just an int - val a = 10 ... The error happens when
>> I try to pass a variable into the closure. The example you have above works
>> fine since there is no variable being passed into the closure from the
>> shell.
>>
>> -Ashish
>>
>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
>>
>>> Using Spark shell :
>>>
>>> scala> import scala.collection.mutable.MutableList
>>> import scala.collection.mutable.MutableList
>>>
>>> scala> val lst = MutableList[(String,String,Double)]()
>>> lst: scala.collection.mutable.MutableList[(String, String, Double)] =
>>> MutableList()
>>>
>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>
>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> <console>:27: error: not found: value a
>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>                                           ^
>>>
>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
>>> <console>:27
>>>
>>> scala> rdd.count()
>>> ...
>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>>> <console>:30, took 0.478350 s
>>> res1: Long = 10000
>>>
>>> Ashish:
>>> Please refine your example to mimic more closely what your code actually
>>> did.
>>>
>>> Thanks
>>>
>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> That can't cause any error, since there is no action in your first
>>>> snippet. Even calling count on the result doesn't cause an error. You
>>>> must be executing something different.
>>>>
>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
>>>> wrote:
>>>> > I am running the Spark shell (1.2.1) in local mode and I have a simple
>>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I get a
>>>> > StackOverFlowError each time I try to run the following code (the code
>>>> > itself is just representative of other logic where I need to pass in a
>>>> > variable). I tried broadcasting the variable too, but no luck ..
>>>> missing
>>>> > something basic here -
>>>> >
>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>>> > val a=10
>>>> > rdd.map(r => if (a==10) 1 else 0)
>>>> > This throws -
>>>> >
>>>> > java.lang.StackOverflowError
>>>> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>>> >     at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>>> >     at
>>>> >
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>> >     at
>>>> >
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>> >     at
>>>> >
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>> >     at
>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>> >     at
>>>> >
>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>> >     at
>>>> >
>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>> >     at
>>>> >
>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>> > ...
>>>> > ...
>>>> >
>>>> > More experiments  .. this works -
>>>> >
>>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >
>>>> > But below doesn't and throws the StackoverflowError -
>>>> >
>>>> > val lst = MutableList[(String,String,Double)]()
>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>>> >
>>>> > Any help appreciated!
>>>> >
>>>> > Thanks,
>>>> > Ashish
>>>> >
>>>> >
>>>> >
>>>> > --
>>>> > View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>>> > Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >
>>>> > ---------------------------------------------------------------------
>>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> > For additional commands, e-mail: user-help@spark.apache.org
>>>> >
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>>
>>>
>

Re: Spark shell and StackOverFlowError

Posted by Ted Yu <yu...@gmail.com>.
I see.

What about using the following in place of variable a ?
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

Cheers

On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty <as...@gmail.com>
wrote:

> @Sean - Agree that there is no action, but I still get the
> stackoverflowerror, its very weird
>
> @Ted - Variable a is just an int - val a = 10 ... The error happens when
> I try to pass a variable into the closure. The example you have above works
> fine since there is no variable being passed into the closure from the
> shell.
>
> -Ashish
>
> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:
>
>> Using Spark shell :
>>
>> scala> import scala.collection.mutable.MutableList
>> import scala.collection.mutable.MutableList
>>
>> scala> val lst = MutableList[(String,String,Double)]()
>> lst: scala.collection.mutable.MutableList[(String, String, Double)] =
>> MutableList()
>>
>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>
>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> <console>:27: error: not found: value a
>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>                                           ^
>>
>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
>> <console>:27
>>
>> scala> rdd.count()
>> ...
>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>> <console>:30, took 0.478350 s
>> res1: Long = 10000
>>
>> Ashish:
>> Please refine your example to mimic more closely what your code actually
>> did.
>>
>> Thanks
>>
>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> That can't cause any error, since there is no action in your first
>>> snippet. Even calling count on the result doesn't cause an error. You
>>> must be executing something different.
>>>
>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
>>> wrote:
>>> > I am running the Spark shell (1.2.1) in local mode and I have a simple
>>> > RDD[(String,String,Double)] with about 10,000 objects in it. I get a
>>> > StackOverFlowError each time I try to run the following code (the code
>>> > itself is just representative of other logic where I need to pass in a
>>> > variable). I tried broadcasting the variable too, but no luck ..
>>> missing
>>> > something basic here -
>>> >
>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>>> > val a=10
>>> > rdd.map(r => if (a==10) 1 else 0)
>>> > This throws -
>>> >
>>> > java.lang.StackOverflowError
>>> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>>> >     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>>> >     at
>>> >
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>> >     at
>>> >
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>> >     at
>>> >
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>> >     at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>> >     at
>>> >
>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>> >     at
>>> >
>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>> >     at
>>> >
>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>> > ...
>>> > ...
>>> >
>>> > More experiments  .. this works -
>>> >
>>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >
>>> > But below doesn't and throws the StackoverflowError -
>>> >
>>> > val lst = MutableList[(String,String,Double)]()
>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>>> >
>>> > Any help appreciated!
>>> >
>>> > Thanks,
>>> > Ashish
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > ---------------------------------------------------------------------
>>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> > For additional commands, e-mail: user-help@spark.apache.org
>>> >
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>

Re: Spark shell and StackOverFlowError

Posted by Ashish Shrowty <as...@gmail.com>.
@Sean - Agree that there is no action, but I still get the
stackoverflowerror, its very weird

@Ted - Variable a is just an int - val a = 10 ... The error happens when I
try to pass a variable into the closure. The example you have above works
fine since there is no variable being passed into the closure from the
shell.

-Ashish

On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yu...@gmail.com> wrote:

> Using Spark shell :
>
> scala> import scala.collection.mutable.MutableList
> import scala.collection.mutable.MutableList
>
> scala> val lst = MutableList[(String,String,Double)]()
> lst: scala.collection.mutable.MutableList[(String, String, Double)] =
> MutableList()
>
> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>
> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> <console>:27: error: not found: value a
>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>                                           ^
>
> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
> <console>:27
>
> scala> rdd.count()
> ...
> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
> <console>:30, took 0.478350 s
> res1: Long = 10000
>
> Ashish:
> Please refine your example to mimic more closely what your code actually
> did.
>
> Thanks
>
> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com> wrote:
>
>> That can't cause any error, since there is no action in your first
>> snippet. Even calling count on the result doesn't cause an error. You
>> must be executing something different.
>>
>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
>> wrote:
>> > I am running the Spark shell (1.2.1) in local mode and I have a simple
>> > RDD[(String,String,Double)] with about 10,000 objects in it. I get a
>> > StackOverFlowError each time I try to run the following code (the code
>> > itself is just representative of other logic where I need to pass in a
>> > variable). I tried broadcasting the variable too, but no luck .. missing
>> > something basic here -
>> >
>> > val rdd = sc.makeRDD(List(<Data read from file>)
>> > val a=10
>> > rdd.map(r => if (a==10) 1 else 0)
>> > This throws -
>> >
>> > java.lang.StackOverflowError
>> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>> >     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>> >     at
>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >     at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >     at
>> >
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >     at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> >     at
>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >     at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >     at
>> >
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> > ...
>> > ...
>> >
>> > More experiments  .. this works -
>> >
>> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >
>> > But below doesn't and throws the StackoverflowError -
>> >
>> > val lst = MutableList[(String,String,Double)]()
>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >
>> > Any help appreciated!
>> >
>> > Thanks,
>> > Ashish
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> > For additional commands, e-mail: user-help@spark.apache.org
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: Spark shell and StackOverFlowError

Posted by Ted Yu <yu...@gmail.com>.
Using Spark shell :

scala> import scala.collection.mutable.MutableList
import scala.collection.mutable.MutableList

scala> val lst = MutableList[(String,String,Double)]()
lst: scala.collection.mutable.MutableList[(String, String, Double)] =
MutableList()

scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))

scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
<console>:27: error: not found: value a
       val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
                                          ^

scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at
<console>:27

scala> rdd.count()
...
15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at <console>:30,
took 0.478350 s
res1: Long = 10000

Ashish:
Please refine your example to mimic more closely what your code actually
did.

Thanks

On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <so...@cloudera.com> wrote:

> That can't cause any error, since there is no action in your first
> snippet. Even calling count on the result doesn't cause an error. You
> must be executing something different.
>
> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com>
> wrote:
> > I am running the Spark shell (1.2.1) in local mode and I have a simple
> > RDD[(String,String,Double)] with about 10,000 objects in it. I get a
> > StackOverFlowError each time I try to run the following code (the code
> > itself is just representative of other logic where I need to pass in a
> > variable). I tried broadcasting the variable too, but no luck .. missing
> > something basic here -
> >
> > val rdd = sc.makeRDD(List(<Data read from file>)
> > val a=10
> > rdd.map(r => if (a==10) 1 else 0)
> > This throws -
> >
> > java.lang.StackOverflowError
> >     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
> >     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
> >     at
> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >     at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >     at
> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >     at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> >     at
> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >     at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >     at
> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> > ...
> > ...
> >
> > More experiments  .. this works -
> >
> > val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >
> > But below doesn't and throws the StackoverflowError -
> >
> > val lst = MutableList[(String,String,Double)]()
> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >
> > Any help appreciated!
> >
> > Thanks,
> > Ashish
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark shell and StackOverFlowError

Posted by Sean Owen <so...@cloudera.com>.
That can't cause any error, since there is no action in your first
snippet. Even calling count on the result doesn't cause an error. You
must be executing something different.

On Sun, Aug 30, 2015 at 4:21 AM, ashrowty <as...@gmail.com> wrote:
> I am running the Spark shell (1.2.1) in local mode and I have a simple
> RDD[(String,String,Double)] with about 10,000 objects in it. I get a
> StackOverFlowError each time I try to run the following code (the code
> itself is just representative of other logic where I need to pass in a
> variable). I tried broadcasting the variable too, but no luck .. missing
> something basic here -
>
> val rdd = sc.makeRDD(List(<Data read from file>)
> val a=10
> rdd.map(r => if (a==10) 1 else 0)
> This throws -
>
> java.lang.StackOverflowError
>     at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>     at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>     at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>     at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> ...
> ...
>
> More experiments  .. this works -
>
> val lst = Range(0,10000).map(i=>("10","10",i:Double)).toList
> sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>
> But below doesn't and throws the StackoverflowError -
>
> val lst = MutableList[(String,String,Double)]()
> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>
> Any help appreciated!
>
> Thanks,
> Ashish
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark shell and StackOverFlowError

Posted by ponkin <al...@ya.ru>.
Hi,
Can not reproduce your error on Spark 1.2.1 . It is not enough information.
What is your command line arguments wцру you starting spark-shell? what data
are you reading? etc.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508p24531.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org